Repository: kafka Updated Branches: refs/heads/trunk a0ad9f156 -> 2656659e0
MINOR: Update TransactionManager to use LogContext Author: Jason Gustafson <ja...@confluent.io> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3852 from hachikuji/minor-use-log-context-txn-manager Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2656659e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2656659e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2656659e Branch: refs/heads/trunk Commit: 2656659e0d7c0e427768ce216df2698acc8c9b11 Parents: a0ad9f1 Author: Jason Gustafson <ja...@confluent.io> Authored: Thu Sep 14 11:23:53 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu Sep 14 11:24:25 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 6 +-- .../clients/producer/internals/Sender.java | 12 ++--- .../producer/internals/TransactionManager.java | 53 +++++++++----------- .../clients/producer/internals/SenderTest.java | 24 ++++----- .../internals/TransactionManagerTest.java | 3 +- 5 files changed, 46 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 816566f..18248bb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - this.transactionManager = configureTransactionState(config, log); + this.transactionManager = configureTransactionState(config, logContext, log); int retries = configureRetries(config, transactionManager != null, log); int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log); short acks = configureAcks(config, transactionManager != null, log); @@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } } - private static TransactionManager configureTransactionState(ProducerConfig config, Logger log) { + private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) { TransactionManager transactionManager = null; @@ -453,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); - transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, retryBackoffMs); + transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs); if (transactionManager.isTransactional()) log.info("Instantiated a transactional producer."); else http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index bf3714e..8da411c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -346,15 +346,14 @@ public class Sender implements Runnable { ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), requestBuilder, now, true, nextRequestHandler); transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId()); - log.debug("{}Sending transactional request {} to node {}", - transactionManager.logPrefix, requestBuilder, targetNode); + log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode); client.send(clientRequest, now); return true; } } catch (IOException e) { - log.debug("{}Disconnect from {} while trying to send request {}. Going " + - "to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder); + log.debug("Disconnect from {} while trying to send request {}. Going " + + "to back off and retry", targetNode, requestBuilder); if (nextRequestHandler.needsCoordinator()) { // We break here so that we pick up the FindCoordinator request immediately. transactionManager.lookupCoordinator(nextRequestHandler); @@ -372,10 +371,7 @@ public class Sender implements Runnable { private void maybeAbortBatches(RuntimeException exception) { if (accumulator.hasIncomplete()) { - String logPrefix = ""; - if (transactionManager != null) - logPrefix = transactionManager.logPrefix; - log.error("{}Aborting producer batches due to fatal error", logPrefix, exception); + log.error("Aborting producer batches due to fatal error", exception); accumulator.abortBatches(exception); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index fad0332..05d943c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset; import org.apache.kafka.common.requests.TxnOffsetCommitResponse; +import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Comparator; @@ -59,14 +59,12 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; * A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production. */ public class TransactionManager { - private static final Logger log = LoggerFactory.getLogger(TransactionManager.class); private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1; + private final Logger log; private final String transactionalId; private final int transactionTimeoutMs; - public final String logPrefix; - private final Map<TopicPartition, Integer> sequenceNumbers; private final PriorityQueue<TxnRequestHandler> pendingRequests; private final Set<TopicPartition> newPartitionsInTransaction; @@ -142,11 +140,11 @@ public class TransactionManager { } } - public TransactionManager(String transactionalId, int transactionTimeoutMs, long retryBackoffMs) { + public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) { this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH); this.sequenceNumbers = new HashMap<>(); this.transactionalId = transactionalId; - this.logPrefix = transactionalId == null ? "" : "[TransactionalId " + transactionalId + "] "; + this.log = logContext.logger(TransactionManager.class); this.transactionTimeoutMs = transactionTimeoutMs; this.transactionCoordinator = null; this.consumerGroupCoordinator = null; @@ -165,7 +163,7 @@ public class TransactionManager { } TransactionManager() { - this(null, 0, 100); + this(new LogContext(), null, 0, 100); } public synchronized TransactionalRequestResult initializeTransactions() { @@ -221,7 +219,7 @@ public class TransactionManager { throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " + "active transaction"); - log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId); + log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, consumerGroupId); AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId); AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets); @@ -235,7 +233,7 @@ public class TransactionManager { if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) return; - log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition); + log.debug("Begin adding new partition {} to transaction", topicPartition); newPartitionsInTransaction.add(topicPartition); } @@ -338,8 +336,7 @@ public class TransactionManager { * Set the producer id and epoch atomically. */ void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) { - log.info("{}ProducerId set to {} with epoch {}", logPrefix, producerIdAndEpoch.producerId, - producerIdAndEpoch.epoch); + log.info("ProducerId set to {} with epoch {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch); this.producerIdAndEpoch = producerIdAndEpoch; } @@ -403,23 +400,23 @@ public class TransactionManager { pendingRequests.poll(); if (maybeTerminateRequestWithError(nextRequestHandler)) { - log.trace("{}Not sending transactional request {} because we are in an error state", - logPrefix, nextRequestHandler.requestBuilder()); + log.trace("Not sending transactional request {} because we are in an error state", + nextRequestHandler.requestBuilder()); return null; } if (nextRequestHandler.isEndTxn() && !transactionStarted) { nextRequestHandler.result.done(); if (currentState != State.FATAL_ERROR) { - log.debug("{}Not sending EndTxn for completed transaction since no partitions " + - "or offsets were successfully added", logPrefix); + log.debug("Not sending EndTxn for completed transaction since no partitions " + + "or offsets were successfully added"); completeTransaction(); } nextRequestHandler = pendingRequests.poll(); } if (nextRequestHandler != null) - log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder()); + log.trace("Request {} dequeued for sending", nextRequestHandler.requestBuilder()); return nextRequestHandler; } @@ -507,9 +504,9 @@ public class TransactionManager { } if (lastError != null) - log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError); + log.debug("Transition from state {} to error state {}", currentState, target, lastError); else - log.debug("{}Transition from state {} to {}", logPrefix, currentState, target); + log.debug("Transition from state {} to {}", currentState, target); currentState = target; } @@ -537,7 +534,7 @@ public class TransactionManager { } private void enqueueRequest(TxnRequestHandler requestHandler) { - log.debug("{}Enqueuing transactional request {}", logPrefix, requestHandler.requestBuilder()); + log.debug("Enqueuing transactional request {}", requestHandler.requestBuilder()); pendingRequests.add(requestHandler); } @@ -634,15 +631,15 @@ public class TransactionManager { } else { clearInFlightRequestCorrelationId(); if (response.wasDisconnected()) { - log.debug("{}Disconnected from {}. Will retry.", logPrefix, response.destination()); + log.debug("Disconnected from {}. Will retry.", response.destination()); if (this.needsCoordinator()) lookupCoordinator(this.coordinatorType(), this.coordinatorKey()); reenqueue(); } else if (response.versionMismatch() != null) { fatalError(response.versionMismatch()); } else if (response.hasResponse()) { - log.trace("{}Received transactional response {} for request {}", logPrefix, - response.responseBody(), requestBuilder()); + log.trace("Received transactional response {} for request {}", response.responseBody(), + requestBuilder()); synchronized (TransactionManager.this) { handleResponse(response.responseBody()); } @@ -781,10 +778,11 @@ public class TransactionManager { } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(topicPartition.topic()); } else if (error == Errors.OPERATION_NOT_ATTEMPTED) { - log.debug("{}Did not attempt to add partition {} to transaction because other partitions in the batch had errors.", logPrefix, topicPartition); + log.debug("Did not attempt to add partition {} to transaction because other partitions in the " + + "batch had errors.", topicPartition); hasPartitionErrors = true; } else { - log.error("{}Could not add partition {} due to unexpected error {}", logPrefix, topicPartition, error); + log.error("Could not add partition {} due to unexpected error {}", topicPartition, error); hasPartitionErrors = true; } } @@ -803,7 +801,7 @@ public class TransactionManager { } else if (hasPartitionErrors) { abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors)); } else { - log.debug("{}Successfully added partitions {} to transaction", logPrefix, partitions); + log.debug("Successfully added partitions {} to transaction", partitions); partitionsInTransaction.addAll(partitions); transactionStarted = true; result.done(); @@ -956,8 +954,7 @@ public class TransactionManager { Errors error = addOffsetsToTxnResponse.error(); if (error == Errors.NONE) { - log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix, - builder.consumerGroupId()); + log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId()); // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId())); @@ -1019,7 +1016,7 @@ public class TransactionManager { TopicPartition topicPartition = entry.getKey(); Errors error = entry.getValue(); if (error == Errors.NONE) { - log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix, + log.debug("Successfully added offsets {} from consumer group {} to transaction.", builder.offsets(), builder.consumerGroupId()); pendingTxnOffsetCommits.remove(topicPartition); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 85b5ba6..6f98e52 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -104,7 +104,7 @@ public class SenderTest { private RecordAccumulator accumulator = null; private Sender sender = null; private SenderMetricsRegistry senderMetricsRegistry = null; - private final LogContext loggerFactory = new LogContext(); + private final LogContext logContext = new LogContext(); @Before public void setup() { @@ -240,7 +240,7 @@ public class SenderTest { Node node = cluster.nodes().get(0); NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 1000, 1000, 64 * 1024, 64 * 1024, 1000, - time, true, new ApiVersions(), throttleTimeSensor, new LogContext()); + time, true, new ApiVersions(), throttleTimeSensor, logContext); short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion(); ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); @@ -277,7 +277,7 @@ public class SenderTest { int maxRetries = 1; Metrics m = new Metrics(); try { - Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions); // do a successful retry Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -324,7 +324,7 @@ public class SenderTest { int maxRetries = 1; Metrics m = new Metrics(); try { - Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); @@ -576,7 +576,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -618,7 +618,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(); - Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -655,7 +655,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -684,7 +684,7 @@ public class SenderTest { public void testTransactionalSplitBatchAndSend() throws Exception { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1); - TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000, 100); + TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100); setupWithTransactionState(txnManager); doInitTransactions(txnManager, producerIdAndEpoch); @@ -705,10 +705,10 @@ public class SenderTest { // Set a good compression ratio. CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); try (Metrics m = new Metrics()) { - accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, + accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, new ApiVersions(), txnManager); SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(); - Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); @@ -826,10 +826,10 @@ public class SenderTest { metricTags.put("client-id", CLIENT_ID); MetricConfig metricConfig = new MetricConfig().tags(metricTags); this.metrics = new Metrics(metricConfig, time); - this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, + this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet()); - this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, + this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 1219b9c..53bba1c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -120,7 +120,8 @@ public class TransactionManagerTest { int batchSize = 16 * 1024; MetricConfig metricConfig = new MetricConfig().tags(metricTags); this.brokerNode = new Node(0, "localhost", 2211); - this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS); + this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, + DEFAULT_RETRY_BACKOFF_MS); Metrics metrics = new Metrics(metricConfig, time); this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,