Repository: kafka Updated Branches: refs/heads/0.11.0 0be4d1af0 -> 4424534e9
MINOR: Logging/debugging improvements for transactions Author: Jason Gustafson <[email protected]> Author: Apurva Mehta <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #3185 from hachikuji/minor-transaction-logging-improvements (cherry picked from commit 0c3e466eb035859659ce41404f3b71b577467dca) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4424534e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4424534e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4424534e Branch: refs/heads/0.11.0 Commit: 4424534e99a92f54a873771dc3f05f23f3487669 Parents: 0be4d1a Author: Jason Gustafson <[email protected]> Authored: Thu Jun 1 19:08:32 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu Jun 1 19:08:52 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 11 ++- .../producer/internals/ProducerIdAndEpoch.java | 5 ++ .../producer/internals/RecordAccumulator.java | 6 +- .../clients/producer/internals/Sender.java | 35 ++++----- .../producer/internals/TransactionManager.java | 74 +++++++++----------- .../requests/AddOffsetsToTxnResponse.java | 9 +++ .../requests/AddPartitionsToTxnResponse.java | 9 +++ .../kafka/common/requests/EndTxnResponse.java | 8 +++ .../kafka/common/requests/FetchResponse.java | 7 +- .../requests/FindCoordinatorResponse.java | 4 +- .../common/requests/InitProducerIdResponse.java | 9 +++ .../requests/TxnOffsetCommitResponse.java | 8 +++ .../transaction/TransactionCoordinator.scala | 13 ++-- core/src/main/scala/kafka/log/Log.scala | 7 +- .../scala/kafka/tools/DumpLogSegments.scala | 36 ++++++++-- 15 files changed, 157 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c2beff8..defbbb7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -223,6 +223,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { TopicPartition partition = entry.getKey(); long fetchOffset = request.fetchData().get(partition).fetchOffset; FetchResponse.PartitionData fetchData = entry.getValue(); + + log.debug("Fetch at offset {} for partition {} returned fetch data {}", fetchOffset, + partition, fetchData); + completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion())); } @@ -232,7 +236,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @Override public void onFailure(RuntimeException e) { - log.debug("Fetch request to {} for partitions {} failed", fetchTarget, request.fetchData().keySet(), e); + log.debug("Fetch request {} to {} failed", request.fetchData(), fetchTarget, e); } }); } @@ -792,8 +796,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { Map<Node, FetchRequest.Builder> requests = new HashMap<>(); for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) { Node node = entry.getKey(); - FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, entry.getValue(), isolationLevel). - setMaxBytes(this.maxBytes); + FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, + entry.getValue(), isolationLevel) + .setMaxBytes(this.maxBytes); requests.put(node, fetch); } return requests; http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java index 01d5e86..293bb51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java @@ -33,4 +33,9 @@ class ProducerIdAndEpoch { public boolean isValid() { return NO_PRODUCER_ID < producerId; } + + @Override + public String toString() { + return "(producerId=" + producerId + ", epoch='" + epoch + ")"; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 3f9f4b1..2c4917d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -488,9 +488,9 @@ public final class RecordAccumulator { // the producer id and sequence here, this attempt will also be accepted, // causing a duplicate. int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition); - log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}", - node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, - batch.topicPartition, sequenceNumber); + log.debug("Assigning sequence number {} from producer {} to dequeued " + + "batch from partition {} bound for {}.", + sequenceNumber, producerIdAndEpoch, batch.topicPartition, node); batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional); } batch.close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 4f1c7d4..8b3957f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -217,7 +217,6 @@ public class Sender implements Runnable { } long pollTimeout = sendProducerData(now); - log.trace("waiting {}ms in poll", pollTimeout); client.poll(pollTimeout, now); } @@ -298,7 +297,6 @@ public class Sender implements Runnable { } private boolean maybeSendTransactionalRequest(long now) { - String transactionalId = transactionManager.transactionalId(); if (transactionManager.isCompletingTransaction() && !transactionManager.hasPartitionsToAdd() && accumulator.hasUnflushedBatches()) { @@ -315,22 +313,15 @@ public class Sender implements Runnable { accumulator.beginFlush(); // Do not send the EndTxn until all pending batches have been completed - if (accumulator.hasUnflushedBatches()) { - log.trace("TransactionalId: {} -- Waiting for pending batches to be flushed before completing transaction", - transactionalId); + if (accumulator.hasUnflushedBatches()) return false; - } } TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(); - if (nextRequestHandler == null) { - log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionalId); + if (nextRequestHandler == null) return false; - } AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder(); - log.trace("TransactionalId: {} -- Preparing to send request {}", transactionalId, requestBuilder); - while (true) { Node targetNode = null; try { @@ -340,6 +331,7 @@ public class Sender implements Runnable { transactionManager.lookupCoordinator(nextRequestHandler); break; } + if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) { transactionManager.lookupCoordinator(nextRequestHandler); break; @@ -347,33 +339,30 @@ public class Sender implements Runnable { } else { targetNode = awaitLeastLoadedNodeReady(requestTimeout); } + if (targetNode != null) { - if (nextRequestHandler.isRetry()) { - log.trace("TransactionalId: {} -- Waiting {}ms before resending request {}", - transactionalId, - retryBackoffMs, requestBuilder); + if (nextRequestHandler.isRetry()) time.sleep(retryBackoffMs); - } + ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), requestBuilder, now, true, nextRequestHandler); transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId()); - log.debug("TransactionalId: {} -- Sending transactional request {} to node {}", - transactionalId, requestBuilder, clientRequest.destination()); + log.debug("{}Sending transactional request {} to node {}", + transactionManager.logPrefix, requestBuilder, targetNode); + client.send(clientRequest, now); return true; } } catch (IOException e) { - log.debug("TransactionalId: {} -- Disconnect from {} while trying to send request {}. Going " + - "to back off and retry", transactionalId, targetNode, requestBuilder); + log.debug("{}Disconnect from {} while trying to send request {}. Going " + + "to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder); } - log.trace("TransactionalId: {} -- About to wait for {}ms before trying to send another request.", - transactionalId, retryBackoffMs); + time.sleep(retryBackoffMs); metadata.requestUpdate(); } transactionManager.retry(nextRequestHandler); - return true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 11068a7..9d9deac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -64,6 +64,7 @@ public class TransactionManager { private final String transactionalId; private final int transactionTimeoutMs; + public final String logPrefix; private final Map<TopicPartition, Integer> sequenceNumbers; private final PriorityQueue<TxnRequestHandler> pendingRequests; @@ -136,6 +137,7 @@ public class TransactionManager { this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH); this.sequenceNumbers = new HashMap<>(); this.transactionalId = transactionalId; + this.logPrefix = transactionalId == null ? "" : "[TransactionalId " + transactionalId + "] "; this.transactionTimeoutMs = transactionTimeoutMs; this.transactionCoordinator = null; this.consumerGroupCoordinator = null; @@ -162,7 +164,7 @@ public class TransactionManager { this.sequenceNumbers.clear(); InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); InitProducerIdHandler handler = new InitProducerIdHandler(builder); - pendingRequests.add(handler); + enqueueRequest(handler); return handler.result; } @@ -191,15 +193,14 @@ public class TransactionManager { } private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) { - if (!newPartitionsInTransaction.isEmpty()) { - pendingRequests.add(addPartitionsToTransactionHandler()); - } + if (!newPartitionsInTransaction.isEmpty()) + enqueueRequest(addPartitionsToTransactionHandler()); TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT; EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, transactionResult); EndTxnHandler handler = new EndTxnHandler(builder); - pendingRequests.add(handler); + enqueueRequest(handler); return handler.result; } @@ -214,7 +215,7 @@ public class TransactionManager { AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId); AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets); - pendingRequests.add(handler); + enqueueRequest(handler); return handler.result; } @@ -304,6 +305,8 @@ public class TransactionManager { * Set the producer id and epoch atomically. */ void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) { + log.info("{}ProducerId set to {} with epoch {}", logPrefix, producerIdAndEpoch.producerId, + producerIdAndEpoch.epoch); this.producerIdAndEpoch = producerIdAndEpoch; } @@ -355,35 +358,35 @@ public class TransactionManager { synchronized TxnRequestHandler nextRequestHandler() { if (!newPartitionsInTransaction.isEmpty()) - pendingRequests.add(addPartitionsToTransactionHandler()); + enqueueRequest(addPartitionsToTransactionHandler()); TxnRequestHandler nextRequestHandler = pendingRequests.poll(); if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler)) { - log.trace("TransactionalId: {} -- Not sending transactional request {} because we are in an error state", - transactionalId, nextRequestHandler.requestBuilder()); + log.trace("{}Not sending transactional request {} because we are in an error state", + logPrefix, nextRequestHandler.requestBuilder()); return null; } if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) { nextRequestHandler.result.done(); if (currentState != State.FATAL_ERROR) { - log.debug("TransactionId: {} -- Not sending EndTxn for completed transaction since no partitions " + - "or offsets were successfully added", transactionalId); + log.debug("{}Not sending EndTxn for completed transaction since no partitions " + + "or offsets were successfully added", logPrefix); completeTransaction(); } - return pendingRequests.poll(); + nextRequestHandler = pendingRequests.poll(); } + + if (nextRequestHandler != null) + log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder()); + return nextRequestHandler; } synchronized void retry(TxnRequestHandler request) { request.setRetry(); - pendingRequests.add(request); - } - - synchronized void reenqueue(TxnRequestHandler request) { - pendingRequests.add(request); + enqueueRequest(request); } Node coordinator(FindCoordinatorRequest.CoordinatorType type) { @@ -445,7 +448,7 @@ public class TransactionManager { lastError = null; } - log.debug("TransactionalId {} -- Transition from state {} to {}", transactionalId, currentState, target); + log.debug("{}Transition from state {} to {}", logPrefix, currentState, target); currentState = target; } @@ -474,6 +477,11 @@ public class TransactionManager { return false; } + private void enqueueRequest(TxnRequestHandler requestHandler) { + log.debug("{}Enqueuing transactional request {}", logPrefix, requestHandler.requestBuilder()); + pendingRequests.add(requestHandler); + } + private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) { switch (type) { case GROUP: @@ -487,7 +495,7 @@ public class TransactionManager { } FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey); - pendingRequests.add(new FindCoordinatorHandler(builder)); + enqueueRequest(new FindCoordinatorHandler(builder)); } private synchronized void completeTransaction() { @@ -550,7 +558,7 @@ public class TransactionManager { void reenqueue() { synchronized (TransactionManager.this) { this.isRetry = true; - pendingRequests.add(this); + enqueueRequest(this); } } @@ -562,12 +570,13 @@ public class TransactionManager { } else { clearInFlightRequestCorrelationId(); if (response.wasDisconnected()) { - log.trace("disconnected from " + response.destination() + ". Will retry."); + log.trace("{}Disconnected from {}. Will retry.", logPrefix, response.destination()); reenqueue(); } else if (response.versionMismatch() != null) { fatalError(response.versionMismatch()); } else if (response.hasResponse()) { - log.trace("Got transactional response for request:" + requestBuilder()); + log.trace("{}Received transactional response {} for request {}", logPrefix, + response.responseBody(), requestBuilder()); synchronized (TransactionManager.this) { handleResponse(response.responseBody()); } @@ -630,9 +639,6 @@ public class TransactionManager { InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response; Errors error = initProducerIdResponse.error(); - log.debug("TransactionalId {} -- Received InitProducerId response with error {}", - transactionalId, error); - if (error == Errors.NONE) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); setProducerIdAndEpoch(producerIdAndEpoch); @@ -676,9 +682,6 @@ public class TransactionManager { boolean hasPartitionErrors = false; Set<String> unauthorizedTopics = new HashSet<>(); - log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}", - transactionalId, errors); - for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet()) { TopicPartition topicPartition = topicPartitionErrorEntry.getKey(); Errors error = topicPartitionErrorEntry.getValue(); @@ -706,8 +709,7 @@ public class TransactionManager { } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(topicPartition.topic()); } else { - log.error("TransactionalId: {} -- Could not add partition {} due to unexpected error {}", - transactionalId, topicPartition, error); + log.error("{}Could not add partition {} due to unexpected error {}", logPrefix, topicPartition, error); hasPartitionErrors = true; } } @@ -758,9 +760,6 @@ public class TransactionManager { FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; Errors error = findCoordinatorResponse.error(); - log.debug("TransactionalId {} -- Received FindCoordinator response with error {}", - transactionalId, error); - if (error == Errors.NONE) { Node node = findCoordinatorResponse.node(); switch (builder.coordinatorType()) { @@ -812,9 +811,6 @@ public class TransactionManager { EndTxnResponse endTxnResponse = (EndTxnResponse) response; Errors error = endTxnResponse.error(); - log.debug("TransactionalId {} -- Received EndTxn response with error {}", - transactionalId, error); - if (error == Errors.NONE) { completeTransaction(); result.done(); @@ -860,9 +856,6 @@ public class TransactionManager { AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response; Errors error = addOffsetsToTxnResponse.error(); - log.debug("TransactionalId {} -- Received AddOffsetsToTxn response with error {}", - transactionalId, error); - if (error == Errors.NONE) { // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId())); @@ -920,9 +913,6 @@ public class TransactionManager { boolean hadFailure = false; Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors(); - log.debug("TransactionalId {} -- Received TxnOffsetCommit response with errors {}", - transactionalId, errors); - for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) { TopicPartition topicPartition = entry.getKey(); Errors error = entry.getValue(); http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java index 0536636..981a234 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java @@ -67,4 +67,13 @@ public class AddOffsetsToTxnResponse extends AbstractResponse { public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) { return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer)); } + + @Override + public String toString() { + return "AddOffsetsToTxnResponse(" + + "error=" + error + + ", throttleTimeMs=" + throttleTimeMs + + ')'; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java index 4112b93..f05310a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java @@ -105,4 +105,13 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { public static AddPartitionsToTxnResponse parse(ByteBuffer buffer, short version) { return new AddPartitionsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer)); } + + @Override + public String toString() { + return "AddPartitionsToTxnResponse(" + + "errors=" + errors + + ", throttleTimeMs=" + throttleTimeMs + + ')'; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java index 9083808..47a6623 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java @@ -66,4 +66,12 @@ public class EndTxnResponse extends AbstractResponse { public static EndTxnResponse parse(ByteBuffer buffer, short version) { return new EndTxnResponse(ApiKeys.END_TXN.parseResponse(version, buffer)); } + + @Override + public String toString() { + return "EndTxnResponse(" + + "error=" + error + + ", throttleTimeMs=" + throttleTimeMs + + ')'; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 96fee43..824a76f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -162,11 +162,14 @@ public class FetchResponse extends AbstractResponse { @Override public String toString() { - return "(error=" + error + ", highWaterMark=" + highWatermark + + return "(error=" + error + + ", highWaterMark=" + highWatermark + ", lastStableOffset = " + lastStableOffset + ", logStartOffset = " + logStartOffset + - ", abortedTransactions = " + abortedTransactions + ", records=" + records + ")"; + ", abortedTransactions = " + abortedTransactions + + ", recordsSizeInBytes=" + records.sizeInBytes() + ")"; } + } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index 11eed1d..ae6986a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -108,11 +108,11 @@ public class FindCoordinatorResponse extends AbstractResponse { @Override public String toString() { - return "FindCoordinatorResponse{" + + return "FindCoordinatorResponse(" + "throttleTimeMs=" + throttleTimeMs + ", errorMessage='" + errorMessage + '\'' + ", error=" + error + ", node=" + node + - '}'; + ')'; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java index da5e6e5..88fb09c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -87,4 +87,13 @@ public class InitProducerIdResponse extends AbstractResponse { return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer)); } + @Override + public String toString() { + return "InitProducerIdResponse(" + + "error=" + error + + ", producerId=" + producerId + + ", producerEpoch=" + epoch + + ", throttleTimeMs=" + throttleTimeMs + + ')'; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index 4c0f010..9a1cefa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -109,4 +109,12 @@ public class TxnOffsetCommitResponse extends AbstractResponse { return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer)); } + @Override + public String toString() { + return "TxnOffsetCommitResponse(" + + "errors=" + errors + + ", throttleTimeMs=" + throttleTimeMs + + ')'; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 44e32b1..5c39635 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager} import kafka.utils.{Logging, Scheduler, ZkUtils} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch @@ -144,8 +145,8 @@ class TransactionCoordinator(brokerId: Int, } result match { - case Left(producerIdResult) => - responseCallback(producerIdResult) + case Left(error) => + responseCallback(error) case Right((coordinatorEpoch, newMetadata)) => if (newMetadata.txnState == Ongoing) { @@ -165,10 +166,14 @@ class TransactionCoordinator(brokerId: Int, sendRetriableErrorCallback) } else { def sendPidResponseCallback(error: Errors): Unit = { - if (error == Errors.NONE) + if (error == Errors.NONE) { + info(s"Initialized transactionalId $transactionalId with producerId ${newMetadata.producerId} and producer " + + s"epoch ${newMetadata.producerEpoch} on partition " + + s"${Topic.TRANSACTION_STATE_TOPIC_NAME}-${txnManager.partitionFor(transactionalId)}") responseCallback(initTransactionMetadata(newMetadata)) - else + } else { responseCallback(initTransactionError(error)) + } } txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback) http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c37ea08..b9968a2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -674,7 +674,7 @@ class Log(@volatile var dir: File, } private def updateFirstUnstableOffset(): Unit = lock synchronized { - this.firstUnstableOffset = producerStateManager.firstUnstableOffset match { + val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match { case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly => val offset = logOffsetMetadata.messageOffset val segment = segments.floorEntry(offset).getValue @@ -682,6 +682,11 @@ class Log(@volatile var dir: File, Some(LogOffsetMetadata(offset, segment.baseOffset, position.position)) case other => other } + + if (updatedFirstStableOffset != this.firstUnstableOffset) { + debug(s"First unstable offset for ${this.name} updated to $updatedFirstStableOffset") + this.firstUnstableOffset = updatedFirstStableOffset + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/4424534e/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 4b38c27..3680d10 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import joptsimple.OptionParser import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, OffsetKey} +import kafka.coordinator.transaction.TransactionLog import kafka.log._ import kafka.serializer.Decoder import kafka.utils._ @@ -60,8 +61,10 @@ object DumpLogSegments { .withOptionalArg() .ofType(classOf[java.lang.String]) .defaultsTo("kafka.serializer.StringDecoder") - val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from __consumer_offsets topic.") - + val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from the " + + "__consumer_offsets topic.") + val transactionLogOpt = parser.accepts("transaction-log-decoder", "if set, log data will be parsed as " + + "transaction metadata from the __transaction_state topic.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.") @@ -70,7 +73,11 @@ object DumpLogSegments { CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) - val printDataLog = options.has(printOpt) || options.has(offsetsOpt) || options.has(valueDecoderOpt) || options.has(keyDecoderOpt) + val printDataLog = options.has(printOpt) || + options.has(offsetsOpt) || + options.has(transactionLogOpt) || + options.has(valueDecoderOpt) || + options.has(keyDecoderOpt) val verifyOnly = options.has(verifyOpt) val indexSanityOnly = options.has(indexSanityOpt) @@ -80,6 +87,8 @@ object DumpLogSegments { val messageParser = if (options.has(offsetsOpt)) { new OffsetsMessageParser + } else if (options.has(transactionLogOpt)) { + new TransactionLogMessageParser } else { val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties) val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties) @@ -263,6 +272,24 @@ object DumpLogSegments { } } + private class TransactionLogMessageParser extends MessageParser[String, String] { + + override def parse(record: Record): (Option[String], Option[String]) = { + val txnKey = TransactionLog.readTxnRecordKey(record.key) + val txnMetadata = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) + + val keyString = s"transactionalId=${txnKey.transactionalId}" + val valueString = s"producerId:${txnMetadata.producerId}," + + s"producerEpoch:${txnMetadata.producerEpoch}," + + s"state=${txnMetadata.state}," + + s"partitions=${txnMetadata.topicPartitions}," + + s"lastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}" + + (Some(keyString), Some(valueString)) + } + + } + private class OffsetsMessageParser extends MessageParser[String, String] { private def hex(bytes: Array[Byte]): String = { if (bytes.isEmpty) @@ -356,7 +383,8 @@ object DumpLogSegments { " compresscodec: " + batch.compressionType) if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { - print(" crc: " + batch.checksum + " sequence: " + record.sequence + + print(" producerId: " + batch.producerId + " sequence: " + record.sequence + + " isTransactional: " + batch.isTransactional + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]")) } else { print(" crc: " + record.checksumOrNull)
