Repository: kafka Updated Branches: refs/heads/trunk e40e27b4e -> 3e6669000
MINOR: Eliminate PID terminology from non test code Producer id is used instead. Also refactored TransactionLog schema code to follow our naming convention and to have better structure. Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson <[email protected]> Closes #3041 from ijuma/eliminate-pid-terminology Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e666900 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e666900 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e666900 Branch: refs/heads/trunk Commit: 3e6669000f082808999a7216b00c4b0f5a94e1da Parents: e40e27b Author: Ismael Juma <[email protected]> Authored: Mon May 15 11:26:08 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 15 11:26:08 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 10 +- .../clients/producer/internals/Sender.java | 27 +-- .../producer/internals/TransactionManager.java | 10 +- .../apache/kafka/common/protocol/Errors.java | 4 +- .../kafka/common/record/DefaultRecordBatch.java | 4 +- .../common/record/MemoryRecordsBuilder.java | 10 +- .../apache/kafka/common/record/RecordBatch.java | 4 +- .../common/requests/AddOffsetsToTxnRequest.java | 6 +- .../requests/AddPartitionsToTxnRequest.java | 12 +- .../kafka/common/requests/EndTxnRequest.java | 12 +- .../kafka/common/requests/FetchResponse.java | 6 +- .../common/requests/TxnOffsetCommitRequest.java | 12 +- .../common/requests/WriteTxnMarkersRequest.java | 12 +- .../requests/WriteTxnMarkersResponse.java | 6 +- .../clients/producer/internals/SenderTest.java | 4 +- .../internals/TransactionManagerTest.java | 4 +- .../kafka/coordinator/group/GroupMetadata.scala | 2 +- .../transaction/TransactionLog.scala | 193 ++++++++++--------- ...nsactionMarkerRequestCompletionHandler.scala | 2 +- .../transaction/TransactionMetadata.scala | 6 +- .../transaction/TransactionStateManager.scala | 2 +- core/src/main/scala/kafka/log/Log.scala | 22 +-- core/src/main/scala/kafka/log/LogManager.scala | 4 +- .../scala/kafka/log/ProducerStateManager.scala | 22 +-- .../main/scala/kafka/server/KafkaConfig.scala | 8 +- .../kafka/tools/ConsumerOffsetChecker.scala | 20 +- .../scala/kafka/tools/DumpLogSegments.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 6 +- 29 files changed, 221 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index cf3736c..d53c19d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -446,19 +446,19 @@ public final class RecordAccumulator { } else { ProducerIdAndEpoch producerIdAndEpoch = null; if (transactionManager != null) { - producerIdAndEpoch = transactionManager.pidAndEpoch(); + producerIdAndEpoch = transactionManager.producerIdAndEpoch(); if (!producerIdAndEpoch.isValid()) - // we cannot send the batch until we have refreshed the PID + // we cannot send the batch until we have refreshed the producer id break; } ProducerBatch batch = deque.pollFirst(); if (producerIdAndEpoch != null && !batch.inRetry()) { - // If the batch is in retry, then we should not change the pid and + // If the batch is in retry, then we should not change the producer id and // sequence number, since this may introduce duplicates. In particular, // the previous attempt may actually have been accepted, and if we change - // the pid and sequence here, this attempt will also be accepted, causing - // a duplicate. + // the producer id and sequence here, this attempt will also be accepted, + // causing a duplicate. int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition); log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}", node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8b96b41..da09a1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -108,7 +108,7 @@ public class Sender implements Runnable { /* current request API versions supported by the known brokers */ private final ApiVersions apiVersions; - /* all the state related to transactions, in particular the PID, epoch, and sequence numbers */ + /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */ private final TransactionManager transactionManager; public Sender(KafkaClient client, @@ -197,7 +197,7 @@ public class Sender implements Runnable { private long sendProducerData(long now) { Cluster cluster = metadata.fetch(); - maybeWaitForPid(); + maybeWaitForProducerId(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); @@ -237,7 +237,7 @@ public class Sender implements Runnable { List<ProducerBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); boolean needsTransactionStateReset = false; - // Reset the PID if an expired batch has previously been sent to the broker. Also update the metrics + // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why // we need to reset the producer id here. for (ProducerBatch expiredBatch : expiredBatches) { @@ -370,8 +370,8 @@ public class Sender implements Runnable { return null; } - private void maybeWaitForPid() { - // If this is a transactional producer, the PID will be received when recovering transactions in the + private void maybeWaitForProducerId() { + // If this is a transactional producer, the producer id will be received when recovering transactions in the // initTransactions() method of the producer. if (transactionManager == null || transactionManager.isTransactional()) return; @@ -395,7 +395,7 @@ public class Sender implements Runnable { "We will back off and try again."); } } catch (Exception e) { - log.warn("Received an exception while trying to get a pid. Will back off and retry.", e); + log.warn("Received an exception while trying to get a producer id. Will back off and retry.", e); } log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); @@ -459,15 +459,16 @@ public class Sender implements Runnable { error); if (transactionManager == null) { reenqueueBatch(batch, now); - } else if (transactionManager.pidAndEpoch().producerId == batch.producerId() && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) { - // If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch. + } else if (transactionManager.producerIdAndEpoch().producerId == batch.producerId() && + transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) { + // If idempotence is enabled only retry the request if the current producer id is the same as the producer id of the batch. log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition, transactionManager.sequenceNumber(batch.topicPartition)); reenqueueBatch(batch, now); } else { failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " + "batch but the producer id changed from " + batch.producerId() + " to " + - transactionManager.pidAndEpoch().producerId + " in the mean time. This batch will be dropped.")); + transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped.")); this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } } else { @@ -476,7 +477,7 @@ public class Sender implements Runnable { exception = new TopicAuthorizationException(batch.topicPartition.topic()); else exception = error.exception(); - if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.pidAndEpoch().producerId) + if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " + "{} at offset {}. This indicates data loss on the broker, and should be investigated.", correlationId, batch.topicPartition, response.baseOffset); @@ -494,8 +495,8 @@ public class Sender implements Runnable { } else { completeBatch(batch, response); - if (transactionManager != null && transactionManager.pidAndEpoch().producerId == batch.producerId() - && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) { + if (transactionManager != null && transactionManager.producerIdAndEpoch().producerId == batch.producerId() + && transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) { transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition, transactionManager.sequenceNumber(batch.topicPartition)); @@ -519,7 +520,7 @@ public class Sender implements Runnable { private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) { if (transactionManager != null && !transactionManager.isTransactional() - && batch.producerId() == transactionManager.pidAndEpoch().producerId) { + && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) { // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees // about the previously committed message. Note that this will discard the producer id and sequence // numbers for all existing partitions. http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 566ad7c..7e2f813 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -114,7 +114,7 @@ public class TransactionManager { // We use the priority to determine the order in which requests need to be sent out. For instance, if we have - // a pending FindCoordinator request, that must always go first. Next, If we need a PID, that must go second. + // a pending FindCoordinator request, that must always go first. Next, If we need a producer id, that must go second. // The endTxn request must always go last. private enum Priority { FIND_COORDINATOR(0), @@ -262,17 +262,17 @@ public class TransactionManager { } /** - * Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to + * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to * verify that the result is valid. * * @return the current ProducerIdAndEpoch. */ - ProducerIdAndEpoch pidAndEpoch() { + ProducerIdAndEpoch producerIdAndEpoch() { return producerIdAndEpoch; } /** - * Set the pid and epoch atomically. + * Set the producer id and epoch atomically. */ void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) { this.producerIdAndEpoch = producerIdAndEpoch; @@ -291,7 +291,7 @@ public class TransactionManager { * messages will return an OutOfOrderSequenceException. * * Note that we can't reset the producer state for the transactional producer as this would mean bumping the epoch - * for the same pid. This might involve aborting the ongoing transaction during the initPidRequest, and the user + * for the same producer id. This might involve aborting the ongoing transaction during the initPidRequest, and the user * would not have any way of knowing this happened. So for the transactional producer, it's best to return the * produce error to the user and let them abort the transaction and close the producer explicitly. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 58a0a2a..a0922cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -439,8 +439,8 @@ public enum Errors { return new InvalidTxnStateException(message); } }), - INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " + - "its transactionalId", + INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producer id which is not currently assigned to " + + "its transactional id", new ApiExceptionBuilder() { @Override public ApiException build(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index f321c3b..74bd3c0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -361,7 +361,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe TimestampType timestampType, long baseTimestamp, long maxTimestamp, - long pid, + long producerId, short epoch, int sequence, boolean isTransactional, @@ -384,7 +384,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp); buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp); buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); - buffer.putLong(position + PRODUCER_ID_OFFSET, pid); + buffer.putLong(position + PRODUCER_ID_OFFSET, producerId); buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch); buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence); buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords); http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 025b402..e52df76 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -95,7 +95,7 @@ public class MemoryRecordsBuilder { * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}. * @param baseOffset The initial offset to use for * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used. - * @param producerId The producer ID (PID) associated with the producer writing this record set + * @param producerId The producer ID associated with the producer writing this record set * @param producerEpoch The epoch of the producer * @param baseSequence The sequence number of the first record in this set * @param isTransactional Whether or not the records are part of a transaction @@ -212,15 +212,15 @@ public class MemoryRecordsBuilder { } } - public void setProducerState(long pid, short epoch, int baseSequence) { + public void setProducerState(long producerId, short epoch, int baseSequence) { if (isClosed()) { // Sequence numbers are assigned when the batch is closed while the accumulator is being drained. // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will - // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence + // be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence // once a batch has been sent to the broker risks introducing duplicates. throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client."); } - this.producerId = pid; + this.producerId = producerId; this.producerEpoch = epoch; this.baseSequence = baseSequence; } @@ -691,7 +691,7 @@ public class MemoryRecordsBuilder { } /** - * Return the ProducerId (PID) of the RecordBatches created by this builder. + * Return the producer id of the RecordBatches created by this builder. */ public long producerId() { return this.producerId; http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index c984deb..42b0c2e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -132,9 +132,9 @@ public interface RecordBatch extends Iterable<Record> { byte magic(); /** - * Get the producer ID (PID) for this log record batch. For older magic versions, this will return 0. + * Get the producer id for this log record batch. For older magic versions, this will return 0. * - * @return The PID or -1 if there is none + * @return The producer id or -1 if there is none */ long producerId(); http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index b017242..4bf8b3e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; public class AddOffsetsToTxnRequest extends AbstractRequest { private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String PID_KEY_NAME = "producer_id"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String EPOCH_KEY_NAME = "producer_epoch"; private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; @@ -68,7 +68,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { public AddOffsetsToTxnRequest(Struct struct, short version) { super(version); this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.producerId = struct.getLong(PID_KEY_NAME); + this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); this.producerEpoch = struct.getShort(EPOCH_KEY_NAME); this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); } @@ -93,7 +93,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { protected Struct toStruct() { Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version())); struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(PID_KEY_NAME, producerId); + struct.set(PRODUCER_ID_KEY_NAME, producerId); struct.set(EPOCH_KEY_NAME, producerEpoch); struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); return struct; http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index 5bbea61..69ae25c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -29,8 +29,8 @@ import java.util.Map; public class AddPartitionsToTxnRequest extends AbstractRequest { private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String PID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; + private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; @@ -72,8 +72,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { public AddPartitionsToTxnRequest(Struct struct, short version) { super(version); this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.producerId = struct.getLong(PID_KEY_NAME); - this.producerEpoch = struct.getShort(EPOCH_KEY_NAME); + this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); + this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); List<TopicPartition> partitions = new ArrayList<>(); Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME); @@ -107,8 +107,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { protected Struct toStruct() { Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version())); struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(PID_KEY_NAME, producerId); - struct.set(EPOCH_KEY_NAME, producerEpoch); + struct.set(PRODUCER_ID_KEY_NAME, producerId); + struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions); Object[] partitionsArray = new Object[mappedPartitions.size()]; http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index 9c215be..ff9b82c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -24,8 +24,8 @@ import java.nio.ByteBuffer; public class EndTxnRequest extends AbstractRequest { private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String PID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; + private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result"; public static class Builder extends AbstractRequest.Builder<EndTxnRequest> { @@ -64,8 +64,8 @@ public class EndTxnRequest extends AbstractRequest { public EndTxnRequest(Struct struct, short version) { super(version); this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.producerId = struct.getLong(PID_KEY_NAME); - this.producerEpoch = struct.getShort(EPOCH_KEY_NAME); + this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); + this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME)); } @@ -89,8 +89,8 @@ public class EndTxnRequest extends AbstractRequest { protected Struct toStruct() { Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version())); struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(PID_KEY_NAME, producerId); - struct.set(EPOCH_KEY_NAME, producerEpoch); + struct.set(PRODUCER_ID_KEY_NAME, producerId); + struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); struct.set(TRANSACTION_RESULT_KEY_NAME, result.id); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index db12d26..0cb87b5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -56,7 +56,7 @@ public class FetchResponse extends AbstractResponse { private static final String RECORD_SET_KEY_NAME = "record_set"; // aborted transaction field names - private static final String PID_KEY_NAME = "producer_id"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String FIRST_OFFSET_KEY_NAME = "first_offset"; private static final int DEFAULT_THROTTLE_TIME = 0; @@ -211,7 +211,7 @@ public class FetchResponse extends AbstractResponse { abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); for (Object abortedTransactionObj : abortedTransactionsArray) { Struct abortedTransactionStruct = (Struct) abortedTransactionObj; - long producerId = abortedTransactionStruct.getLong(PID_KEY_NAME); + long producerId = abortedTransactionStruct.getLong(PRODUCER_ID_KEY_NAME); long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME); abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); } @@ -339,7 +339,7 @@ public class FetchResponse extends AbstractResponse { List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size()); for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) { Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME); - abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.producerId); + abortedTransactionStruct.set(PRODUCER_ID_KEY_NAME, abortedTransaction.producerId); abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset); abortedTransactionStructs.add(abortedTransactionStruct); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 8778b49..3f3024f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -28,8 +28,8 @@ import java.util.Map; public class TxnOffsetCommitRequest extends AbstractRequest { private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; - private static final String PID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; + private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String RETENTION_TIME_KEY_NAME = "retention_time"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String TOPIC_KEY_NAME = "topic"; @@ -84,8 +84,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest { public TxnOffsetCommitRequest(Struct struct, short version) { super(version); this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); - this.producerId = struct.getLong(PID_KEY_NAME); - this.producerEpoch = struct.getShort(EPOCH_KEY_NAME); + this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); + this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME); Map<TopicPartition, CommittedOffset> offsets = new HashMap<>(); @@ -128,8 +128,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest { protected Struct toStruct() { Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version())); struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); - struct.set(PID_KEY_NAME, producerId); - struct.set(EPOCH_KEY_NAME, producerEpoch); + struct.set(PRODUCER_ID_KEY_NAME, producerId); + struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs); Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets); http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java index 0c09880..cf2c9fc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java @@ -33,8 +33,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest { private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch"; private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers"; - private static final String PID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; + private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String TOPIC_KEY_NAME = "topic"; @@ -138,8 +138,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest { for (Object markerObj : markersArray) { Struct markerStruct = (Struct) markerObj; - long producerId = markerStruct.getLong(PID_KEY_NAME); - short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME); + long producerId = markerStruct.getLong(PRODUCER_ID_KEY_NAME); + short producerEpoch = markerStruct.getShort(PRODUCER_EPOCH_KEY_NAME); int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME); TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME)); @@ -172,8 +172,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest { int i = 0; for (TxnMarkerEntry entry : markers) { Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME); - markerStruct.set(PID_KEY_NAME, entry.producerId); - markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch); + markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId); + markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch); markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch); markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id); http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java index 00133a6..06f6662 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java @@ -29,7 +29,7 @@ import java.util.Map; public class WriteTxnMarkersResponse extends AbstractResponse { private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers"; - private static final String PID_KEY_NAME = "producer_id"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String PARTITIONS_KEY_NAME = "partitions"; private static final String TOPIC_KEY_NAME = "topic"; @@ -62,7 +62,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { for (Object responseObj : responseArray) { Struct responseStruct = (Struct) responseObj; - long producerId = responseStruct.getLong(PID_KEY_NAME); + long producerId = responseStruct.getLong(PRODUCER_ID_KEY_NAME); Map<TopicPartition, Errors> errorPerPartition = new HashMap<>(); Object[] topicPartitionsArray = responseStruct.getArray(TOPIC_PARTITIONS_KEY_NAME); @@ -90,7 +90,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { int k = 0; for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) { Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME); - responseStruct.set(PID_KEY_NAME, responseEntry.getKey()); + responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey()); Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue(); Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(partitionAndErrors); http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index bb13dcb..1321fba 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -387,8 +387,8 @@ public class SenderTest { }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0)); sender.run(time.milliseconds()); assertTrue(transactionManager.hasProducerId()); - assertEquals(producerId, transactionManager.pidAndEpoch().producerId); - assertEquals((short) 0, transactionManager.pidAndEpoch().epoch); + assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId); + assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 53686e2..c0acfec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -286,8 +286,8 @@ public class TransactionManagerTest { assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed. assertTrue(transactionManager.hasProducerId()); - assertEquals(pid, transactionManager.pidAndEpoch().producerId); - assertEquals(epoch, transactionManager.pidAndEpoch().epoch); + assertEquals(pid, transactionManager.producerIdAndEpoch().producerId); + assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 8122694..2f76d63 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -146,7 +146,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private val members = new mutable.HashMap[String, MemberMetadata] private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] - // A map from a PID to the open offset commits for that pid. + // A map from a producer id to the open offset commits for that producer id. private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]() private var receivedTransactionalOffsetCommits = false private var receivedConsumerOffsetCommits = false http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index a180502..d0c9e87 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -30,12 +30,12 @@ import org.apache.kafka.common.record.CompressionType import scala.collection.mutable /* - * Messages stored for the transaction topic represent the pid and transactional status of the corresponding + * Messages stored for the transaction topic represent the producer id and transactional status of the corresponding * transactional id, which have versions for both the key and value fields. Key and value * versions are used to evolve the message formats: * * key version 0: [transactionalId] - * -> value version 0: [pid, epoch, expire_timestamp, status, [topic [partition], timestamp ] + * -> value version 0: [producer_id, producer_epoch, expire_timestamp, status, [topic [partition], timestamp] */ object TransactionLog { @@ -55,69 +55,73 @@ object TransactionLog { val EnforcedRequiredAcks: Short = (-1).toShort // log message formats - private val TXN_ID_KEY = "transactional_id" - - private val PID_KEY = "pid" - private val EPOCH_KEY = "epoch" - private val TXN_TIMEOUT_KEY = "transaction_timeout" - private val TXN_STATUS_KEY = "transaction_status" - private val TXN_PARTITIONS_KEY = "transaction_partitions" - private val TXN_ENTRY_TIMESTAMP_FIELD = "transaction_entry_timestamp" - private val TXN_START_TIMESTAMP_FIELD = "transaction_start_timestamp" - private val TOPIC_KEY = "topic" - private val PARTITION_IDS_KEY = "partition_ids" - - private val KEY_SCHEMA_V0 = new Schema(new Field(TXN_ID_KEY, STRING)) - private val KEY_SCHEMA_TXN_ID_FIELD = KEY_SCHEMA_V0.get(TXN_ID_KEY) - - private val VALUE_PARTITIONS_SCHEMA = new Schema(new Field(TOPIC_KEY, STRING), - new Field(PARTITION_IDS_KEY, new ArrayOf(INT32))) - private val PARTITIONS_SCHEMA_TOPIC_FIELD = VALUE_PARTITIONS_SCHEMA.get(TOPIC_KEY) - private val PARTITIONS_SCHEMA_PARTITION_IDS_FIELD = VALUE_PARTITIONS_SCHEMA.get(PARTITION_IDS_KEY) - - private val VALUE_SCHEMA_V0 = new Schema(new Field(PID_KEY, INT64, "Producer id in use by the transactional id."), - new Field(EPOCH_KEY, INT16, "Epoch associated with the producer id"), - new Field(TXN_TIMEOUT_KEY, INT32, "Transaction timeout in milliseconds"), - new Field(TXN_STATUS_KEY, INT8, - "TransactionState the transaction is in"), - new Field(TXN_PARTITIONS_KEY, ArrayOf.nullable(VALUE_PARTITIONS_SCHEMA), - "Set of partitions involved in the transaction"), - new Field(TXN_ENTRY_TIMESTAMP_FIELD, INT64, "Time the transaction was last updated"), - new Field(TXN_START_TIMESTAMP_FIELD, INT64, "Time the transaction was started")) - private val VALUE_SCHEMA_PID_FIELD = VALUE_SCHEMA_V0.get(PID_KEY) - private val VALUE_SCHEMA_EPOCH_FIELD = VALUE_SCHEMA_V0.get(EPOCH_KEY) - private val VALUE_SCHEMA_TXN_TIMEOUT_FIELD = VALUE_SCHEMA_V0.get(TXN_TIMEOUT_KEY) - private val VALUE_SCHEMA_TXN_STATUS_FIELD = VALUE_SCHEMA_V0.get(TXN_STATUS_KEY) - private val VALUE_SCHEMA_TXN_PARTITIONS_FIELD = VALUE_SCHEMA_V0.get(TXN_PARTITIONS_KEY) - private val VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD = VALUE_SCHEMA_V0.get(TXN_ENTRY_TIMESTAMP_FIELD) - private val VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD = VALUE_SCHEMA_V0.get(TXN_START_TIMESTAMP_FIELD) - - private val KEY_SCHEMAS = Map( - 0 -> KEY_SCHEMA_V0) - - private val VALUE_SCHEMAS = Map( - 0 -> VALUE_SCHEMA_V0) - - private val CURRENT_KEY_SCHEMA_VERSION = 0.toShort - private val CURRENT_VALUE_SCHEMA_VERSION = 0.toShort - - private val CURRENT_KEY_SCHEMA = schemaForKey(CURRENT_KEY_SCHEMA_VERSION) - - private val CURRENT_VALUE_SCHEMA = schemaForValue(CURRENT_VALUE_SCHEMA_VERSION) + + private object KeySchema { + private val TXN_ID_KEY = "transactional_id" + + private val V0 = new Schema(new Field(TXN_ID_KEY, STRING)) + private val SCHEMAS = Map(0 -> V0) + + val CURRENT_VERSION = 0.toShort + val CURRENT = schemaForKey(CURRENT_VERSION) + + val TXN_ID_FIELD = V0.get(TXN_ID_KEY) + + def ofVersion(version: Int): Option[Schema] = SCHEMAS.get(version) + } + + private object ValueSchema { + private val ProducerIdKey = "producer_id" + private val ProducerEpochKey = "producer_epoch" + private val TxnTimeoutKey = "transaction_timeout" + private val TxnStatusKey = "transaction_status" + private val TxnPartitionsKey = "transaction_partitions" + private val TxnEntryTimestampKey = "transaction_entry_timestamp" + private val TxnStartTimestampKey = "transaction_start_timestamp" + + private val PartitionIdsKey = "partition_ids" + private val TopicKey = "topic" + private val PartitionsSchema = new Schema(new Field(TopicKey, STRING), + new Field(PartitionIdsKey, new ArrayOf(INT32))) + + private val V0 = new Schema(new Field(ProducerIdKey, INT64, "Producer id in use by the transactional id."), + new Field(ProducerEpochKey, INT16, "Epoch associated with the producer id"), + new Field(TxnTimeoutKey, INT32, "Transaction timeout in milliseconds"), + new Field(TxnStatusKey, INT8, + "TransactionState the transaction is in"), + new Field(TxnPartitionsKey, ArrayOf.nullable(PartitionsSchema), + "Set of partitions involved in the transaction"), + new Field(TxnEntryTimestampKey, INT64, "Time the transaction was last updated"), + new Field(TxnStartTimestampKey, INT64, "Time the transaction was started")) + + private val Schemas = Map(0 -> V0) + + val CurrentVersion = 0.toShort + val Current = schemaForValue(CurrentVersion) + + val ProducerIdField = V0.get(ProducerIdKey) + val ProducerEpochField = V0.get(ProducerEpochKey) + val TxnTimeoutField = V0.get(TxnTimeoutKey) + val TxnStatusField = V0.get(TxnStatusKey) + val TxnPartitionsField = V0.get(TxnPartitionsKey) + val TxnEntryTimestampField = V0.get(TxnEntryTimestampKey) + val TxnStartTimestampField = V0.get(TxnStartTimestampKey) + + val PartitionsTopicField = PartitionsSchema.get(TopicKey) + val PartitionIdsField = PartitionsSchema.get(PartitionIdsKey) + + def ofVersion(version: Int): Option[Schema] = Schemas.get(version) + } private def schemaForKey(version: Int) = { - val schemaOpt = KEY_SCHEMAS.get(version) - schemaOpt match { - case Some(schema) => schema - case _ => throw new KafkaException(s"Unknown transaction log message key schema version $version") + KeySchema.ofVersion(version).getOrElse { + throw new KafkaException(s"Unknown transaction log message key schema version $version") } } private def schemaForValue(version: Int) = { - val schemaOpt = VALUE_SCHEMAS.get(version) - schemaOpt match { - case Some(schema) => schema - case _ => throw new KafkaException(s"Unknown transaction log message value schema version $version") + ValueSchema.ofVersion(version).getOrElse { + throw new KafkaException(s"Unknown transaction log message value schema version $version") } } @@ -127,11 +131,12 @@ object TransactionLog { * @return key bytes */ private[coordinator] def keyToBytes(transactionalId: String): Array[Byte] = { - val key = new Struct(CURRENT_KEY_SCHEMA) - key.set(KEY_SCHEMA_TXN_ID_FIELD, transactionalId) + import KeySchema._ + val key = new Struct(CURRENT) + key.set(TXN_ID_FIELD, transactionalId) val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf) - byteBuffer.putShort(CURRENT_KEY_SCHEMA_VERSION) + byteBuffer.putShort(CURRENT_VERSION) key.writeTo(byteBuffer) byteBuffer.array() } @@ -142,37 +147,38 @@ object TransactionLog { * @return value payload bytes */ private[coordinator] def valueToBytes(txnMetadata: TransactionMetadataTransition): Array[Byte] = { - val value = new Struct(CURRENT_VALUE_SCHEMA) - value.set(VALUE_SCHEMA_PID_FIELD, txnMetadata.producerId) - value.set(VALUE_SCHEMA_EPOCH_FIELD, txnMetadata.producerEpoch) - value.set(VALUE_SCHEMA_TXN_TIMEOUT_FIELD, txnMetadata.txnTimeoutMs) - value.set(VALUE_SCHEMA_TXN_STATUS_FIELD, txnMetadata.txnState.byte) - value.set(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD, txnMetadata.txnLastUpdateTimestamp) - value.set(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD, txnMetadata.txnStartTimestamp) + import ValueSchema._ + val value = new Struct(Current) + value.set(ProducerIdField, txnMetadata.producerId) + value.set(ProducerEpochField, txnMetadata.producerEpoch) + value.set(TxnTimeoutField, txnMetadata.txnTimeoutMs) + value.set(TxnStatusField, txnMetadata.txnState.byte) + value.set(TxnEntryTimestampField, txnMetadata.txnLastUpdateTimestamp) + value.set(TxnStartTimestampField, txnMetadata.txnStartTimestamp) if (txnMetadata.txnState == Empty) { if (txnMetadata.topicPartitions.nonEmpty) throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.txnState}: $txnMetadata") - value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, null) + value.set(TxnPartitionsField, null) } else { // first group the topic partitions by their topic names val topicAndPartitions = txnMetadata.topicPartitions.groupBy(_.topic()) val partitionArray = topicAndPartitions.map { case(topic, partitions) => - val topicPartitionsStruct = value.instance(VALUE_SCHEMA_TXN_PARTITIONS_FIELD) + val topicPartitionsStruct = value.instance(TxnPartitionsField) val partitionIds: Array[Integer] = partitions.map(topicPartition => Integer.valueOf(topicPartition.partition())).toArray - topicPartitionsStruct.set(PARTITIONS_SCHEMA_TOPIC_FIELD, topic) - topicPartitionsStruct.set(PARTITIONS_SCHEMA_PARTITION_IDS_FIELD, partitionIds) + topicPartitionsStruct.set(PartitionsTopicField, topic) + topicPartitionsStruct.set(PartitionIdsField, partitionIds) topicPartitionsStruct } - value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, partitionArray.toArray) + value.set(TxnPartitionsField, partitionArray.toArray) } val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) - byteBuffer.putShort(CURRENT_VALUE_SCHEMA_VERSION) + byteBuffer.putShort(CurrentVersion) value.writeTo(byteBuffer) byteBuffer.array() } @@ -187,8 +193,8 @@ object TransactionLog { val keySchema = schemaForKey(version) val key = keySchema.read(buffer) - if (version == CURRENT_KEY_SCHEMA_VERSION) { - val transactionalId = key.getString(KEY_SCHEMA_TXN_ID_FIELD) + if (version == KeySchema.CURRENT_VERSION) { + val transactionalId = key.getString(KeySchema.TXN_ID_FIELD) TxnKey(version, transactionalId) } else { @@ -197,37 +203,38 @@ object TransactionLog { } /** - * Decodes the transaction log messages' payload and retrieves pid metadata from it + * Decodes the transaction log messages' payload and retrieves the transaction metadata from it * - * @return a pid metadata object from the message + * @return a transaction metadata object from the message */ def readMessageValue(buffer: ByteBuffer): TransactionMetadata = { if (buffer == null) { // tombstone null } else { + import ValueSchema._ val version = buffer.getShort val valueSchema = schemaForValue(version) val value = valueSchema.read(buffer) - if (version == CURRENT_VALUE_SCHEMA_VERSION) { - val pid = value.get(VALUE_SCHEMA_PID_FIELD).asInstanceOf[Long] - val epoch = value.get(VALUE_SCHEMA_EPOCH_FIELD).asInstanceOf[Short] - val timeout = value.get(VALUE_SCHEMA_TXN_TIMEOUT_FIELD).asInstanceOf[Int] + if (version == CurrentVersion) { + val producerId = value.getLong(ProducerIdField) + val epoch = value.getShort(ProducerEpochField) + val timeout = value.getInt(TxnTimeoutField) - val stateByte = value.getByte(VALUE_SCHEMA_TXN_STATUS_FIELD) + val stateByte = value.getByte(TxnStatusField) val state = TransactionMetadata.byteToState(stateByte) - val entryTimestamp = value.get(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD).asInstanceOf[Long] - val startTimestamp = value.get(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD).asInstanceOf[Long] + val entryTimestamp = value.getLong(TxnEntryTimestampField) + val startTimestamp = value.getLong(TxnStartTimestampField) - val transactionMetadata = new TransactionMetadata(pid, epoch, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp) + val transactionMetadata = new TransactionMetadata(producerId, epoch, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp) if (!state.equals(Empty)) { - val topicPartitionArray = value.getArray(VALUE_SCHEMA_TXN_PARTITIONS_FIELD) + val topicPartitionArray = value.getArray(TxnPartitionsField) topicPartitionArray.foreach { memberMetadataObj => val memberMetadata = memberMetadataObj.asInstanceOf[Struct] - val topic = memberMetadata.get(PARTITIONS_SCHEMA_TOPIC_FIELD).asInstanceOf[String] - val partitionIdArray = memberMetadata.getArray(PARTITIONS_SCHEMA_PARTITION_IDS_FIELD) + val topic = memberMetadata.getString(PartitionsTopicField) + val partitionIdArray = memberMetadata.getArray(PartitionIdsField) val topicPartitions = partitionIdArray.map { partitionIdObj => val partitionId = partitionIdObj.asInstanceOf[Integer] @@ -252,12 +259,12 @@ object TransactionLog { case txnKey: TxnKey => val transactionalId = txnKey.transactionalId val value = consumerRecord.value - val pidMetadata = + val producerIdMetadata = if (value == null) "NULL" else readMessageValue(ByteBuffer.wrap(value)) output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) output.write("::".getBytes(StandardCharsets.UTF_8)) - output.write(pidMetadata.toString.getBytes(StandardCharsets.UTF_8)) + output.write(producerIdMetadata.toString.getBytes(StandardCharsets.UTF_8)) output.write("\n".getBytes(StandardCharsets.UTF_8)) case _ => // no-op } @@ -265,7 +272,7 @@ object TransactionLog { } } -trait BaseKey{ +sealed trait BaseKey { def version: Short def transactionalId: Any } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 5978a97..39c7914 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -56,7 +56,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val errors = writeTxnMarkerResponse.errors(txnMarker.producerId) if (errors == null) - throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for pid ${txnMarker.producerId}") + throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for producer id ${txnMarker.producerId}") txnStateManager.getTransactionState(transactionalId) match { case None => http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index a76617e..d05676b 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -70,9 +70,9 @@ private[transaction] case object CompleteCommit extends TransactionState { val b private[transaction] case object CompleteAbort extends TransactionState { val byte: Byte = 5 } private[transaction] object TransactionMetadata { - def apply(pid: Long, epoch: Short, txnTimeoutMs: Int, timestamp: Long) = new TransactionMetadata(pid, epoch, txnTimeoutMs, Empty, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp) + def apply(producerId: Long, epoch: Short, txnTimeoutMs: Int, timestamp: Long) = new TransactionMetadata(producerId, epoch, txnTimeoutMs, Empty, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp) - def apply(pid: Long, epoch: Short, txnTimeoutMs: Int, state: TransactionState, timestamp: Long) = new TransactionMetadata(pid, epoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp) + def apply(producerId: Long, epoch: Short, txnTimeoutMs: Int, state: TransactionState, timestamp: Long) = new TransactionMetadata(producerId, epoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp) def byteToState(byte: Byte): TransactionState = { byte match { @@ -212,7 +212,7 @@ private[transaction] class TransactionMetadata(val producerId: Long, // metadata transition is valid only if all the following conditions are met: // // 1. the new state is already indicated in the pending state. - // 2. the pid is the same (i.e. this field should never be changed) + // 2. the producerId is the same (i.e. this field should never be changed) // 3. the epoch should be either the same value or old value + 1. // 4. the last update time is no smaller than the old value. // 4. the old partitions set is a subset of the new partitions set. http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 1106e7c..cf41fc3 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -104,7 +104,7 @@ class TransactionStateManager(brokerId: Int, } def enablePidExpiration() { - // TODO: add pid expiration logic + // TODO: add producer id expiration logic } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f499aa8..a4796d1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -114,8 +114,8 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk * @param scheduler The thread pool scheduler used for background actions * @param time The time instance used for checking the clock - * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired - * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired + * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired + * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired */ @threadsafe class Log(@volatile var dir: File, @@ -124,8 +124,8 @@ class Log(@volatile var dir: File, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, time: Time = Time.SYSTEM, - val maxPidExpirationMs: Int = 60 * 60 * 1000, - val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup { + val maxProducerIdExpirationMs: Int = 60 * 60 * 1000, + val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -149,7 +149,7 @@ class Log(@volatile var dir: File, /* The earliest offset which is part of an incomplete transaction. This is used to compute the LSO. */ @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None - private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs) + private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] @@ -207,7 +207,7 @@ class Log(@volatile var dir: File, lock synchronized { producerStateManager.removeExpiredProducers(time.milliseconds) } - }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) + }, period = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) /** The name of this log */ def name = dir.getName() @@ -306,7 +306,7 @@ class Log(@volatile var dir: File, } private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized { - val stateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs) + val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds) logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment => val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset) @@ -625,7 +625,7 @@ class Log(@volatile var dir: File, segment.updateTxnIndex(completedTxn, lastStableOffset) } - // always update the last pid map offset so that the snapshot reflects the current offset + // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) @@ -779,8 +779,8 @@ class Log(@volatile var dir: File, completedTxns: ListBuffer[CompletedTxn], lastEntry: Option[ProducerIdEntry], loadingFromLog: Boolean): Unit = { - val pid = batch.producerId - val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog)) + val producerId = batch.producerId + val appendInfo = producers.getOrElseUpdate(producerId, new ProducerAppendInfo(producerId, lastEntry, loadingFromLog)) val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers) maybeCompletedTxn.foreach(completedTxns += _) @@ -1551,7 +1551,7 @@ object Log { new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix) /** - * Construct a PID snapshot file using the given offset. + * Construct a producer id snapshot file using the given offset. * * @param dir The directory in which the log will reside * @param offset The last offset (exclusive) included in the snapshot http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index af771f1..4ce4716 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -173,7 +173,7 @@ class LogManager(val logDirs: Array[File], config = config, logStartOffset = logStartOffset, recoveryPoint = logRecoveryPoint, - maxPidExpirationMs = maxPidExpirationMs, + maxProducerIdExpirationMs = maxPidExpirationMs, scheduler = scheduler, time = time) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { @@ -414,7 +414,7 @@ class LogManager(val logDirs: Array[File], config = config, logStartOffset = 0L, recoveryPoint = 0L, - maxPidExpirationMs = maxPidExpirationMs, + maxProducerIdExpirationMs = maxPidExpirationMs, scheduler = scheduler, time = time) logs.put(topicPartition, log) http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index d7b1c33..02609b2 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -122,7 +122,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc shouldValidateSequenceNumbers: Boolean): Unit = { if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog) // skip validation if this is the first entry when loading from the log. Log retention - // will generally have removed the beginning entries from each PID + // will generally have removed the beginning entries from each producer id validateAppend(epoch, firstSeq, lastSeq, shouldValidateSequenceNumbers) this.producerEpoch = epoch @@ -303,18 +303,18 @@ object ProducerStateManager { } /** - * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g. + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. * epoch, sequence number, last offset, etc.) * * The sequence number is the last number successfully appended to the partition for the given identifier. * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message * appended to the partition. * - * As long as a PID is contained in the map, the corresponding producer can continue to write data. - * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure - * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to - * age. This ensures that PIDs will not be expired until either the max expiration time has been reached, + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, * or if the topic also is configured for deletion, the segment containing the last written offset has * been deleted. */ @@ -415,7 +415,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs /** - * Expire any PIDs which have been idle longer than the configured maximum expiration timeout. + * Expire any producer ids which have been idle longer than the configured maximum expiration timeout. */ def removeExpiredProducers(currentTimeMs: Long) { producers.retain { case (producerId, lastEntry) => @@ -424,7 +424,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, } /** - * Truncate the PID mapping to the given offset range and reload the entries from the most recent + * Truncate the producer id mapping to the given offset range and reload the entries from the most recent * snapshot in range (if there is one). Note that the log end offset is assumed to be less than * or equal to the high watermark. */ @@ -451,7 +451,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def update(appendInfo: ProducerAppendInfo): Unit = { if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID) - throw new IllegalArgumentException("Invalid PID passed to update") + throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update") val entry = appendInfo.lastEntry producers.put(appendInfo.producerId, entry) @@ -465,7 +465,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, } /** - * Get the last written entry for the given PID. + * Get the last written entry for the given producer id. */ def lastEntry(producerId: Long): Option[ProducerIdEntry] = producers.get(producerId) @@ -532,7 +532,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, } /** - * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping. + * Truncate the producer id mapping and remove all snapshots. This resets the state of the mapping. */ def truncate() { producers.clear() http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5ee4b12..99eddab 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -592,7 +592,7 @@ object KafkaConfig { val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " + "If a clientâs requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction." val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic." - val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading pid and transactions into the cache." + val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache." val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " + "Internal topic creation will fail until the cluster size meets this replication factor requirement." val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)." @@ -610,9 +610,9 @@ object KafkaConfig { val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas" /** ********* Transaction Configuration ***********/ val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " + - "transaction coordinator. Note that this also influences PID expiration: PIDs are guaranteed to expire " + - "after expiration of this timeout from the last write by the PID (they may expire sooner if the last write " + - "from the PID is deleted due to the topic's retention settings)." + "transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " + + "after expiration of this timeout from the last write by the producer id (they may expire sooner if the last write " + + "from the producer id is deleted due to the topic's retention settings)." val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index eab3258..bcf2b58 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -54,36 +54,36 @@ object ConsumerOffsetChecker extends Logging { } private def processPartition(zkUtils: ZkUtils, - group: String, topic: String, pid: Int) { - val topicPartition = TopicAndPartition(topic, pid) + group: String, topic: String, producerId: Int) { + val topicPartition = TopicAndPartition(topic, producerId) val offsetOpt = offsetMap.get(topicPartition) val groupDirs = new ZKGroupTopicDirs(group, topic) - val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1 - zkUtils.getLeaderForPartition(topic, pid) match { + val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1 + zkUtils.getLeaderForPartition(topic, producerId) match { case Some(bid) => val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid)) consumerOpt match { case Some(consumer) => - val topicAndPartition = TopicAndPartition(topic, pid) + val topicAndPartition = TopicAndPartition(topic, producerId) val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) - println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), owner match {case Some(ownerStr) => ownerStr case None => "none"})) case None => // ignore } case None => - println("No broker for partition %s - %s".format(topic, pid)) + println("No broker for partition %s - %s".format(topic, producerId)) } } private def processTopic(zkUtils: ZkUtils, group: String, topic: String) { topicPidMap.get(topic) match { - case Some(pids) => - pids.sorted.foreach { - pid => processPartition(zkUtils, group, topic, pid) + case Some(producerIds) => + producerIds.sorted.foreach { + producerId => processPartition(zkUtils, group, topic, producerId) } case None => // ignore } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 0b0ad7b..4d35a85 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -136,7 +136,7 @@ object DumpLogSegments { private def dumpTxnIndex(file: File): Unit = { val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file) for (abortedTxn <- index.allAbortedTxns) { - println(s"version: ${abortedTxn.version} pid: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + + println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c12f774..fc78501 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -64,7 +64,7 @@ object ZkUtils { val BrokerSequenceIdPath = s"$BrokersPath/seqid" val ConfigChangesPath = s"$ConfigPath/changes" val ConfigUsersPath = s"$ConfigPath/users" - val ProducerIdBlockPath = "/latest_pid_block" + val ProducerIdBlockPath = "/latest_producer_id_block" // Important: it is necessary to add any new top level Zookeeper path to the Seq val SecureZkRootPaths = Seq(AdminPath, BrokersPath, http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index aaef466..e545255 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -98,7 +98,7 @@ class LogTest { LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, - maxPidExpirationMs = 24 * 60, + maxProducerIdExpirationMs = 24 * 60, scheduler = time.scheduler, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) @@ -2436,8 +2436,8 @@ class LogTest { recoveryPoint = 0L, scheduler = time.scheduler, time = time, - maxPidExpirationMs = maxPidExpirationMs, - pidExpirationCheckIntervalMs = pidExpirationCheckIntervalMs) + maxProducerIdExpirationMs = maxPidExpirationMs, + producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs) log }
