MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse
Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #2997 from hachikuji/minor-rename-initpid Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1c8e7d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1c8e7d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1c8e7d9 Branch: refs/heads/trunk Commit: a1c8e7d941ad9c765dac232435a297f905eeeed5 Parents: 1cb39f7 Author: Jason Gustafson <[email protected]> Authored: Fri May 12 19:59:34 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri May 12 19:59:34 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../clients/producer/internals/PidAndEpoch.java | 36 ------ .../producer/internals/ProducerBatch.java | 4 +- .../producer/internals/ProducerIdAndEpoch.java | 36 ++++++ .../producer/internals/RecordAccumulator.java | 12 +- .../clients/producer/internals/Sender.java | 24 ++-- .../producer/internals/TransactionManager.java | 64 +++++----- .../errors/InvalidTxnTimeoutException.java | 2 +- .../apache/kafka/common/protocol/Errors.java | 3 +- .../kafka/common/requests/AbstractRequest.java | 2 +- .../kafka/common/requests/AbstractResponse.java | 2 +- .../kafka/common/requests/InitPidRequest.java | 104 ---------------- .../kafka/common/requests/InitPidResponse.java | 89 -------------- .../common/requests/InitProducerIdRequest.java | 104 ++++++++++++++++ .../common/requests/InitProducerIdResponse.java | 89 ++++++++++++++ .../clients/producer/internals/SenderTest.java | 20 ++-- .../internals/TransactionManagerTest.java | 26 ++-- .../common/requests/RequestResponseTest.java | 8 +- .../transaction/ProducerIdManager.scala | 119 ++++++++++--------- .../transaction/TransactionCoordinator.scala | 90 +++++++------- .../TransactionMarkerChannelManager.scala | 7 +- .../scala/kafka/log/ProducerStateManager.scala | 84 ++++++------- .../src/main/scala/kafka/server/KafkaApis.scala | 29 +++-- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +- .../transaction/ProducerIdManagerTest.scala | 14 +-- .../TransactionCoordinatorIntegrationTest.scala | 16 +-- .../TransactionCoordinatorTest.scala | 50 ++++---- .../unit/kafka/server/RequestQuotaTest.scala | 4 +- 29 files changed, 525 insertions(+), 523 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b1f405a..05edf65 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -688,7 +688,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (transactionManager == null) return; - if (transactionManager.isTransactional() && !transactionManager.hasPid()) + if (transactionManager.isTransactional() && !transactionManager.hasProducerId()) throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled."); if (transactionManager.isFenced()) http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java deleted file mode 100644 index 8647a7b..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.producer.internals; - -import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH; -import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; - -class PidAndEpoch { - static final PidAndEpoch NONE = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH); - - public final long producerId; - public final short epoch; - - PidAndEpoch(long producerId, short epoch) { - this.producerId = producerId; - this.epoch = epoch; - } - - public boolean isValid() { - return NO_PRODUCER_ID < producerId; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index f5fe8e6..3c5965a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -231,8 +231,8 @@ public final class ProducerBatch { return recordsBuilder.isFull(); } - public void setProducerState(PidAndEpoch pidAndEpoch, int baseSequence) { - recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence); + public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) { + recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java new file mode 100644 index 0000000..01d5e86 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH; +import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; + +class ProducerIdAndEpoch { + static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH); + + public final long producerId; + public final short epoch; + + ProducerIdAndEpoch(long producerId, short epoch) { + this.producerId = producerId; + this.epoch = epoch; + } + + public boolean isValid() { + return NO_PRODUCER_ID < producerId; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 4ffab0a..cf3736c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -444,16 +444,16 @@ public final class RecordAccumulator { // request break; } else { - PidAndEpoch pidAndEpoch = null; + ProducerIdAndEpoch producerIdAndEpoch = null; if (transactionManager != null) { - pidAndEpoch = transactionManager.pidAndEpoch(); - if (!pidAndEpoch.isValid()) + producerIdAndEpoch = transactionManager.pidAndEpoch(); + if (!producerIdAndEpoch.isValid()) // we cannot send the batch until we have refreshed the PID break; } ProducerBatch batch = deque.pollFirst(); - if (pidAndEpoch != null && !batch.inRetry()) { + if (producerIdAndEpoch != null && !batch.inRetry()) { // If the batch is in retry, then we should not change the pid and // sequence number, since this may introduce duplicates. In particular, // the previous attempt may actually have been accepted, and if we change @@ -461,9 +461,9 @@ public final class RecordAccumulator { // a duplicate. int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition); log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}", - node, pidAndEpoch.producerId, pidAndEpoch.epoch, + node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.topicPartition, sequenceNumber); - batch.setProducerState(pidAndEpoch, sequenceNumber); + batch.setProducerState(producerIdAndEpoch, sequenceNumber); } batch.close(); size += batch.sizeInBytes(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 4d95ac0..8b96b41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -42,8 +42,8 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.requests.InitPidRequest; -import org.apache.kafka.common.requests.InitPidResponse; +import org.apache.kafka.common.requests.InitProducerIdRequest; +import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.Time; @@ -357,7 +357,7 @@ public class Sender implements Runnable { private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException { String nodeId = node.idString(); - InitPidRequest.Builder builder = new InitPidRequest.Builder(null); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null); ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null); return NetworkClientUtils.sendAndReceive(client, request, time); } @@ -376,28 +376,28 @@ public class Sender implements Runnable { if (transactionManager == null || transactionManager.isTransactional()) return; - while (!transactionManager.hasPid()) { + while (!transactionManager.hasProducerId()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node); - if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { - InitPidResponse initPidResponse = (InitPidResponse) response.responseBody(); - PidAndEpoch pidAndEpoch = new PidAndEpoch( - initPidResponse.producerId(), initPidResponse.epoch()); - transactionManager.setPidAndEpoch(pidAndEpoch); + if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) { + InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody(); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch( + initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); + transactionManager.setProducerIdAndEpoch(producerIdAndEpoch); } else { - log.error("Received an unexpected response type for an InitPidRequest from {}. " + + log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " + "We will back off and try again.", node); } } else { - log.debug("Could not find an available broker to send InitPidRequest to. " + + log.debug("Could not find an available broker to send InitProducerIdRequest to. " + "We will back off and try again."); } } catch (Exception e) { log.warn("Received an exception while trying to get a pid. Will back off and retry.", e); } - log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs); + log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index ff3f114..566ad7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -35,8 +35,8 @@ import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; -import org.apache.kafka.common.requests.InitPidRequest; -import org.apache.kafka.common.requests.InitPidResponse; +import org.apache.kafka.common.requests.InitProducerIdRequest; +import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; @@ -79,7 +79,7 @@ public class TransactionManager { private volatile State currentState = State.UNINITIALIZED; private volatile Exception lastError = null; - private volatile PidAndEpoch pidAndEpoch; + private volatile ProducerIdAndEpoch producerIdAndEpoch; private enum State { UNINITIALIZED, @@ -130,7 +130,7 @@ public class TransactionManager { } public TransactionManager(String transactionalId, int transactionTimeoutMs) { - this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH); + this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH); this.sequenceNumbers = new HashMap<>(); this.transactionalId = transactionalId; this.transactionTimeoutMs = transactionTimeoutMs; @@ -155,10 +155,10 @@ public class TransactionManager { public synchronized TransactionalRequestResult initializeTransactions() { ensureTransactional(); transitionTo(State.INITIALIZING); - setPidAndEpoch(PidAndEpoch.NONE); + setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); this.sequenceNumbers.clear(); - InitPidRequest.Builder builder = new InitPidRequest.Builder(transactionalId, transactionTimeoutMs); - InitPidHandler handler = new InitPidHandler(builder); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); + InitProducerIdHandler handler = new InitProducerIdHandler(builder); pendingRequests.add(handler); return handler.result; } @@ -190,8 +190,8 @@ public class TransactionManager { } TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT; - EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, pidAndEpoch.producerId, - pidAndEpoch.epoch, transactionResult); + EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, + producerIdAndEpoch.epoch, transactionResult); EndTxnHandler handler = new EndTxnHandler(builder); pendingRequests.add(handler); return handler.result; @@ -206,7 +206,7 @@ public class TransactionManager { "active transaction"); AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, - pidAndEpoch.producerId, pidAndEpoch.epoch, consumerGroupId); + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId); AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets); pendingRequests.add(handler); return handler.result; @@ -226,8 +226,8 @@ public class TransactionManager { return transactionalId; } - public boolean hasPid() { - return pidAndEpoch.isValid(); + public boolean hasProducerId() { + return producerIdAndEpoch.isValid(); } public boolean isTransactional() { @@ -262,20 +262,20 @@ public class TransactionManager { } /** - * Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to + * Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to * verify that the result is valid. * - * @return the current PidAndEpoch. + * @return the current ProducerIdAndEpoch. */ - PidAndEpoch pidAndEpoch() { - return pidAndEpoch; + ProducerIdAndEpoch pidAndEpoch() { + return producerIdAndEpoch; } /** * Set the pid and epoch atomically. */ - void setPidAndEpoch(PidAndEpoch pidAndEpoch) { - this.pidAndEpoch = pidAndEpoch; + void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) { + this.producerIdAndEpoch = producerIdAndEpoch; } /** @@ -299,7 +299,7 @@ public class TransactionManager { if (isTransactional()) throw new IllegalStateException("Cannot reset producer state for a transactional producer. " + "You must either abort the ongoing transaction or reinitialize the transactional producer instead"); - setPidAndEpoch(PidAndEpoch.NONE); + setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); this.sequenceNumbers.clear(); } @@ -448,7 +448,7 @@ public class TransactionManager { pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction); newPartitionsToBeAddedToTransaction.clear(); AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId, - pidAndEpoch.producerId, pidAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction)); + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction)); return new AddPartitionsToTxnHandler(builder); } @@ -461,7 +461,7 @@ public class TransactionManager { pendingTxnOffsetCommits.put(entry.getKey(), committedOffset); } TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId, - pidAndEpoch.producerId, pidAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME, + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME, pendingTxnOffsetCommits); return new TxnOffsetCommitHandler(result, builder); } @@ -487,7 +487,7 @@ public class TransactionManager { void fenced() { log.error("Producer has become invalid, which typically means another producer with the same " + "transactional.id has been started: producerId: {}. epoch: {}.", - pidAndEpoch.producerId, pidAndEpoch.epoch); + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch); result.setError(Errors.INVALID_PRODUCER_EPOCH.exception()); transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception()); result.done(); @@ -548,15 +548,15 @@ public class TransactionManager { abstract Priority priority(); } - private class InitPidHandler extends TxnRequestHandler { - private final InitPidRequest.Builder builder; + private class InitProducerIdHandler extends TxnRequestHandler { + private final InitProducerIdRequest.Builder builder; - private InitPidHandler(InitPidRequest.Builder builder) { + private InitProducerIdHandler(InitProducerIdRequest.Builder builder) { this.builder = builder; } @Override - InitPidRequest.Builder requestBuilder() { + InitProducerIdRequest.Builder requestBuilder() { return builder; } @@ -567,11 +567,11 @@ public class TransactionManager { @Override public void handleResponse(AbstractResponse response) { - InitPidResponse initPidResponse = (InitPidResponse) response; - Errors error = initPidResponse.error(); + InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response; + Errors error = initProducerIdResponse.error(); if (error == Errors.NONE) { - PidAndEpoch pidAndEpoch = new PidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch()); - setPidAndEpoch(pidAndEpoch); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); + setProducerIdAndEpoch(producerIdAndEpoch); transitionTo(State.READY); lastError = null; result.done(); @@ -581,7 +581,7 @@ public class TransactionManager { } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); } else { - fatal(new KafkaException("Unexpected error in InitPidResponse; " + error.message())); + fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message())); } } } @@ -616,7 +616,7 @@ public class TransactionManager { reenqueue(); } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); - } else if (error == Errors.INVALID_PID_MAPPING) { + } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) { fatal(new KafkaException(error.exception())); } else if (error == Errors.INVALID_TXN_STATE) { fatal(new KafkaException(error.exception())); http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java index e5df248..c751bc4 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.errors; /** - * The transaction coordinator returns this error code if the timeout received via the InitPidRequest is larger than + * The transaction coordinator returns this error code if the timeout received via the InitProducerIdRequest is larger than * the `max.transaction.timeout.ms` config value. */ public class InvalidTxnTimeoutException extends ApiException { http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 960fdda..58a0a2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -439,7 +439,8 @@ public enum Errors { return new InvalidTxnStateException(message); } }), - INVALID_PID_MAPPING(49, "The PID mapping is invalid", + INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " + + "its transactionalId", new ApiExceptionBuilder() { @Override public ApiException build(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 04f2602..3aeb879 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -179,7 +179,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse { request = new DeleteRecordsRequest(struct, version); break; case INIT_PRODUCER_ID: - request = new InitPidRequest(struct, version); + request = new InitProducerIdRequest(struct, version); break; case OFFSET_FOR_LEADER_EPOCH: request = new OffsetsForLeaderEpochRequest(struct, version); http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index b76cb21..617934c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -95,7 +95,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case DELETE_RECORDS: return new DeleteRecordsResponse(struct); case INIT_PRODUCER_ID: - return new InitPidResponse(struct); + return new InitProducerIdResponse(struct); case OFFSET_FOR_LEADER_EPOCH: return new OffsetsForLeaderEpochResponse(struct); case ADD_PARTITIONS_TO_TXN: http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java deleted file mode 100644 index 57d32e2..0000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; - -public class InitPidRequest extends AbstractRequest { - public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE; - - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms"; - - private final String transactionalId; - private final int transactionTimeoutMs; - - public static class Builder extends AbstractRequest.Builder<InitPidRequest> { - private final String transactionalId; - private final int transactionTimeoutMs; - - public Builder(String transactionalId) { - this(transactionalId, NO_TRANSACTION_TIMEOUT_MS); - } - - public Builder(String transactionalId, int transactionTimeoutMs) { - super(ApiKeys.INIT_PRODUCER_ID); - - if (transactionTimeoutMs <= 0) - throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs); - - if (transactionalId != null && transactionalId.isEmpty()) - throw new IllegalArgumentException("Must set either a null or a non-empty transactional id."); - - this.transactionalId = transactionalId; - this.transactionTimeoutMs = transactionTimeoutMs; - } - - @Override - public InitPidRequest build(short version) { - return new InitPidRequest(version, transactionalId, transactionTimeoutMs); - } - - @Override - public String toString() { - return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" + - transactionTimeoutMs + ")"; - } - } - - public InitPidRequest(Struct struct, short version) { - super(version); - this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME); - } - - private InitPidRequest(short version, String transactionalId, int transactionTimeoutMs) { - super(version); - this.transactionalId = transactionalId; - this.transactionTimeoutMs = transactionTimeoutMs; - } - - @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new InitPidResponse(throttleTimeMs, Errors.forException(e)); - } - - public static InitPidRequest parse(ByteBuffer buffer, short version) { - return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version); - } - - public String transactionalId() { - return transactionalId; - } - - public int transactionTimeoutMs() { - return transactionTimeoutMs; - } - - @Override - protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version())); - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs); - return struct; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java deleted file mode 100644 index 3c858af..0000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.record.RecordBatch; - -import java.nio.ByteBuffer; - -public class InitPidResponse extends AbstractResponse { - /** - * Possible Error codes: - * OK - * - */ - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; - private static final String PRODUCER_ID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; - private final int throttleTimeMs; - private final Errors error; - private final long producerId; - private final short epoch; - - public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; - this.producerId = producerId; - this.epoch = epoch; - } - - public InitPidResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); - this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); - this.epoch = struct.getShort(EPOCH_KEY_NAME); - } - - public InitPidResponse(int throttleTimeMs, Errors errors) { - this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0); - } - - public int throttleTimeMs() { - return throttleTimeMs; - } - - public long producerId() { - return producerId; - } - - public Errors error() { - return error; - } - - public short epoch() { - return epoch; - } - - @Override - protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(PRODUCER_ID_KEY_NAME, producerId); - struct.set(EPOCH_KEY_NAME, epoch); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - return struct; - } - - public static InitPidResponse parse(ByteBuffer buffer, short version) { - return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer)); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java new file mode 100644 index 0000000..45f88a2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class InitProducerIdRequest extends AbstractRequest { + public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE; + + private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; + private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms"; + + private final String transactionalId; + private final int transactionTimeoutMs; + + public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> { + private final String transactionalId; + private final int transactionTimeoutMs; + + public Builder(String transactionalId) { + this(transactionalId, NO_TRANSACTION_TIMEOUT_MS); + } + + public Builder(String transactionalId, int transactionTimeoutMs) { + super(ApiKeys.INIT_PRODUCER_ID); + + if (transactionTimeoutMs <= 0) + throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs); + + if (transactionalId != null && transactionalId.isEmpty()) + throw new IllegalArgumentException("Must set either a null or a non-empty transactional id."); + + this.transactionalId = transactionalId; + this.transactionTimeoutMs = transactionTimeoutMs; + } + + @Override + public InitProducerIdRequest build(short version) { + return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs); + } + + @Override + public String toString() { + return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" + + transactionTimeoutMs + ")"; + } + } + + public InitProducerIdRequest(Struct struct, short version) { + super(version); + this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); + this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME); + } + + private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) { + super(version); + this.transactionalId = transactionalId; + this.transactionTimeoutMs = transactionTimeoutMs; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e)); + } + + public static InitProducerIdRequest parse(ByteBuffer buffer, short version) { + return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version); + } + + public String transactionalId() { + return transactionalId; + } + + public int transactionTimeoutMs() { + return transactionTimeoutMs; + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version())); + struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); + struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs); + return struct; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java new file mode 100644 index 0000000..7c8a6e5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; + +import java.nio.ByteBuffer; + +public class InitProducerIdResponse extends AbstractResponse { + /** + * Possible Error codes: + * OK + * + */ + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; + private static final String PRODUCER_ID_KEY_NAME = "producer_id"; + private static final String EPOCH_KEY_NAME = "producer_epoch"; + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private final int throttleTimeMs; + private final Errors error; + private final long producerId; + private final short epoch; + + public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) { + this.throttleTimeMs = throttleTimeMs; + this.error = error; + this.producerId = producerId; + this.epoch = epoch; + } + + public InitProducerIdResponse(Struct struct) { + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); + this.epoch = struct.getShort(EPOCH_KEY_NAME); + } + + public InitProducerIdResponse(int throttleTimeMs, Errors errors) { + this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0); + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + public long producerId() { + return producerId; + } + + public Errors error() { + return error; + } + + public short epoch() { + return epoch; + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version)); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(PRODUCER_ID_KEY_NAME, producerId); + struct.set(EPOCH_KEY_NAME, epoch); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + return struct; + } + + public static InitProducerIdResponse parse(ByteBuffer buffer, short version) { + return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index c01a375..bb13dcb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -37,9 +37,9 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.ProduceRequest; -import org.apache.kafka.common.requests.InitPidRequest; -import org.apache.kafka.common.requests.InitPidResponse; +import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; @@ -382,11 +382,11 @@ public class SenderTest { client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { - return body instanceof InitPidRequest; + return body instanceof InitProducerIdRequest; } - }, new InitPidResponse(0, Errors.NONE, producerId, (short) 0)); + }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0)); sender.run(time.milliseconds()); - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); assertEquals(producerId, transactionManager.pidAndEpoch().producerId); assertEquals((short) 0, transactionManager.pidAndEpoch().epoch); } @@ -395,7 +395,7 @@ public class SenderTest { public void testSequenceNumberIncrement() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); - transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0)); + transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); @@ -448,7 +448,7 @@ public class SenderTest { public void testAbortRetryWhenPidChanges() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); - transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0)); + transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); @@ -480,7 +480,7 @@ public class SenderTest { assertEquals(0, client.inFlightRequestCount()); assertFalse("Client ready status should be false", client.isReady(node, 0L)); - transactionManager.setPidAndEpoch(new PidAndEpoch(producerId + 1, (short) 0)); + transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0)); sender.run(time.milliseconds()); // receive error sender.run(time.milliseconds()); // reconnect sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors. @@ -497,7 +497,7 @@ public class SenderTest { public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); - transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0)); + transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0)); setupWithTransactionState(transactionManager); client.setNode(new Node(1, "localhost", 33343)); @@ -528,7 +528,7 @@ public class SenderTest { sender.run(time.milliseconds()); assertTrue(responseFuture.isDone()); - assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasPid()); + assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId()); } private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8e46eb7..53686e2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -44,8 +44,8 @@ import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; -import org.apache.kafka.common.requests.InitPidRequest; -import org.apache.kafka.common.requests.InitPidResponse; +import org.apache.kafka.common.requests.InitProducerIdRequest; +import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.TransactionResult; @@ -163,7 +163,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // get pid. - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); @@ -275,7 +275,7 @@ public class TransactionManagerTest { assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); assertFalse(initPidResult.isCompleted()); - assertFalse(transactionManager.hasPid()); + assertFalse(transactionManager.hasProducerId()); prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); sender.run(time.milliseconds()); @@ -285,7 +285,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // get pid and epoch assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed. - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); assertEquals(pid, transactionManager.pidAndEpoch().producerId); assertEquals(epoch, transactionManager.pidAndEpoch().epoch); } @@ -308,7 +308,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // get pid. - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); @@ -365,7 +365,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // get pid. - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); // User does one producer.sed transactionManager.maybeAddPartitionToTransaction(tp0); @@ -428,7 +428,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // get pid. - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); @@ -463,7 +463,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // get pid. - assertTrue(transactionManager.hasPid()); + assertTrue(transactionManager.hasProducerId()); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); @@ -530,12 +530,12 @@ public class TransactionManagerTest { client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { - InitPidRequest initPidRequest = (InitPidRequest) body; - assertEquals(initPidRequest.transactionalId(), transactionalId); - assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs); + InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body; + assertEquals(initProducerIdRequest.transactionalId(), transactionalId); + assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs); return true; } - }, new InitPidResponse(0, error, pid, epoch), shouldDisconnect); + }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect); } private void prepareProduceResponse(Errors error, final long pid, final short epoch) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b1e83bf..cbfb6a9 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -879,12 +879,12 @@ public class RequestResponseTest { return new DeleteTopicsResponse(errors); } - private InitPidRequest createInitPidRequest() { - return new InitPidRequest.Builder(null, 100).build(); + private InitProducerIdRequest createInitPidRequest() { + return new InitProducerIdRequest.Builder(null, 100).build(); } - private InitPidResponse createInitPidResponse() { - return new InitPidResponse(0, Errors.NONE, 3332, (short) 3); + private InitProducerIdResponse createInitPidResponse() { + return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index bb7f57b..916ffa9 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -20,49 +20,49 @@ import kafka.common.KafkaException import kafka.utils.{Json, Logging, ZkUtils} /** - * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds (PIDs) in a unique way - * such that the same PID will not be assigned twice across multiple transaction coordinators. + * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way + * such that the same producerId will not be assigned twice across multiple transaction coordinators. * - * PIDs are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who - * claims the block, where the written block_start_pid and block_end_pid are both inclusive. + * ProducerIds are managed via ZooKeeper, where the latest producerId block is written on the corresponding ZK + * path by the manager who claims the block, where the written block_start and block_end are both inclusive. */ object ProducerIdManager extends Logging { val CurrentVersion: Long = 1L val PidBlockSize: Long = 1000L - def generatePidBlockJson(pidBlock: ProducerIdBlock): String = { + def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): String = { Json.encode(Map("version" -> CurrentVersion, - "broker" -> pidBlock.brokerId, - "block_start" -> pidBlock.blockStartPid.toString, - "block_end" -> pidBlock.blockEndPid.toString) + "broker" -> producerIdBlock.brokerId, + "block_start" -> producerIdBlock.blockStartId.toString, + "block_end" -> producerIdBlock.blockEndId.toString) ) } - def parsePidBlockData(jsonData: String): ProducerIdBlock = { + def parseProducerIdBlockData(jsonData: String): ProducerIdBlock = { try { Json.parseFull(jsonData).flatMap { m => - val pidBlockInfo = m.asInstanceOf[Map[String, Any]] - val brokerId = pidBlockInfo("broker").asInstanceOf[Int] - val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong - val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong - Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID)) - }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData")) + val producerIdBlockInfo = m.asInstanceOf[Map[String, Any]] + val brokerId = producerIdBlockInfo("broker").asInstanceOf[Int] + val blockStart = producerIdBlockInfo("block_start").asInstanceOf[String].toLong + val blockEnd = producerIdBlockInfo("block_end").asInstanceOf[String].toLong + Some(ProducerIdBlock(brokerId, blockStart, blockEnd)) + }.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonData")) } catch { case e: java.lang.NumberFormatException => // this should never happen: the written data has exceeded long type limit - fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit") + fatal(s"Read jason data $jsonData contains producerIds that have exceeded long type limit") throw e } } } -case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) { +case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long) { override def toString: String = { - val pidBlockInfo = new StringBuilder - pidBlockInfo.append("(brokerId:" + brokerId) - pidBlockInfo.append(",blockStartPID:" + blockStartPid) - pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")") - pidBlockInfo.toString() + val producerIdBlockInfo = new StringBuilder + producerIdBlockInfo.append("(brokerId:" + brokerId) + producerIdBlockInfo.append(",blockStartProducerId:" + blockStartId) + producerIdBlockInfo.append(",blockEndProducerId:" + blockEndId + ")") + producerIdBlockInfo.toString() } } @@ -70,84 +70,85 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging this.logIdent = "[ProducerId Manager " + brokerId + "]: " - private var currentPIDBlock: ProducerIdBlock = null - private var nextPID: Long = -1L + private var currentProducerIdBlock: ProducerIdBlock = null + private var nextProducerId: Long = -1L - // grab the first block of PIDs + // grab the first block of producerIds this synchronized { - getNewPidBlock() - nextPID = currentPIDBlock.blockStartPid + getNewProducerIdBlock() + nextProducerId = currentProducerIdBlock.blockStartId } - private def getNewPidBlock(): Unit = { + private def getNewProducerIdBlock(): Unit = { var zkWriteComplete = false while (!zkWriteComplete) { - // refresh current pid block from zookeeper again - val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath) + // refresh current producerId block from zookeeper again + val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath) - // generate the new pid block - currentPIDBlock = dataOpt match { + // generate the new producerId block + currentProducerIdBlock = dataOpt match { case Some(data) => - val currPIDBlock = ProducerIdManager.parsePidBlockData(data) - debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion") + val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data) + debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion") - if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) { - // we have exhausted all pids (wow!), treat it as a fatal error - fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})") - throw new KafkaException("Have exhausted all pids.") + if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) { + // we have exhausted all producerIds (wow!), treat it as a fatal error + fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})") + throw new KafkaException("Have exhausted all producerIds.") } - ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize) + ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize) case None => - debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block") + debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block") ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1) } - val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock) + val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock) - // try to write the new pid block into zookeeper - val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData)) + // try to write the new producerId block into zookeeper + val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath, + newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData)) zkWriteComplete = succeeded if (zkWriteComplete) - info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version") + info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version") } } - private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = { + private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = { try { - val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData) - val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath) + val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData) + val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath) dataOpt match { case Some(data) => - val currPIDBlock = ProducerIdManager.parsePidBlockData(data) - (currPIDBlock.equals(expectedPidBlock), zkVersion) + val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data) + (currProducerIdBLock == expectedPidBlock, zkVersion) case None => (false, -1) } } catch { case e: Exception => - warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e) - + warn(s"Error while checking for producerId block Zk data on path $path: expected data $expectedData", e) + (false, -1) } } - def nextPid(): Long = { + def generateProducerId(): Long = { this synchronized { - // grab a new block of PIDs if this block has been exhausted - if (nextPID > currentPIDBlock.blockEndPid) { - getNewPidBlock() - nextPID = currentPIDBlock.blockStartPid + 1 + // grab a new block of producerIds if this block has been exhausted + if (nextProducerId > currentProducerIdBlock.blockEndId) { + getNewProducerIdBlock() + nextProducerId = currentProducerIdBlock.blockStartId + 1 } else { - nextPID += 1 + nextProducerId += 1 } - nextPID - 1 + nextProducerId - 1 } } def shutdown() { - info(s"Shutdown complete: last PID assigned $nextPID") + info(s"Shutdown complete: last producerId assigned $nextProducerId") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 7632f3f..233f7d7 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -47,20 +47,20 @@ object TransactionCoordinator { config.transactionTopicMinISR, config.transactionTransactionsExpiredTransactionCleanupIntervalMs) - val pidManager = new ProducerIdManager(config.brokerId, zkUtils) + val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils) val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time) val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false) val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time) - new TransactionCoordinator(config.brokerId, scheduler, pidManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time) + new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time) } - private def initTransactionError(error: Errors): InitPidResult = { - InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error) + private def initTransactionError(error: Errors): InitProducerIdResult = { + InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error) } - private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitPidResult = { - InitPidResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE) + private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitProducerIdResult = { + InitProducerIdResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE) } } @@ -74,7 +74,7 @@ object TransactionCoordinator { */ class TransactionCoordinator(brokerId: Int, scheduler: Scheduler, - pidManager: ProducerIdManager, + producerIdManager: ProducerIdManager, txnManager: TransactionStateManager, txnMarkerChannelManager: TransactionMarkerChannelManager, txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], @@ -83,22 +83,21 @@ class TransactionCoordinator(brokerId: Int, import TransactionCoordinator._ - type InitPidCallback = InitPidResult => Unit + type InitProducerIdCallback = InitProducerIdResult => Unit type AddPartitionsCallback = Errors => Unit type EndTxnCallback = Errors => Unit /* Active flag of the coordinator */ private val isActive = new AtomicBoolean(false) - def handleInitPid(transactionalId: String, - transactionTimeoutMs: Int, - responseCallback: InitPidCallback): Unit = { - + def handleInitProducerId(transactionalId: String, + transactionTimeoutMs: Int, + responseCallback: InitProducerIdCallback): Unit = { if (transactionalId == null || transactionalId.isEmpty) { // if the transactional id is not specified, then always blindly accept the request - // and return a new pid from the pid manager - val pid = pidManager.nextPid() - responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE)) + // and return a new producerId from the producerId manager + val producerId = producerIdManager.generateProducerId() + responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE)) } else if (!txnManager.isCoordinatorFor(transactionalId)) { // check if it is the assigned coordinator for the transactional id responseCallback(initTransactionError(Errors.NOT_COORDINATOR)) @@ -108,12 +107,12 @@ class TransactionCoordinator(brokerId: Int, // check transactionTimeoutMs is not larger than the broker configured maximum allowed value responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT)) } else { - // only try to get a new pid and update the cache if the transactional id is unknown - val result: Either[InitPidResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match { + // only try to get a new producerId and update the cache if the transactional id is unknown + val result: Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match { case None => - val pid = pidManager.nextPid() + val producerId = producerIdManager.generateProducerId() val now = time.milliseconds() - val createdMetadata = new TransactionMetadata(producerId = pid, + val createdMetadata = new TransactionMetadata(producerId = producerId, producerEpoch = 0, txnTimeoutMs = transactionTimeoutMs, state = Empty, @@ -129,7 +128,7 @@ class TransactionCoordinator(brokerId: Int, // in this case we will treat it as the metadata has existed already txnMetadata synchronized { if (!txnMetadata.eq(createdMetadata)) { - initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata) + initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata) } else { Right(coordinatorEpoch, txnMetadata.prepareNewPid(time.milliseconds())) } @@ -140,13 +139,13 @@ class TransactionCoordinator(brokerId: Int, val txnMetadata = existingEpochAndMetadata.transactionMetadata txnMetadata synchronized { - initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata) + initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata) } } result match { - case Left(pidResult) => - responseCallback(pidResult) + case Left(producerIdResult) => + responseCallback(producerIdResult) case Right((coordinatorEpoch, newMetadata)) => if (newMetadata.txnState == Ongoing) { @@ -178,11 +177,10 @@ class TransactionCoordinator(brokerId: Int, } } - private def initPidWithExistingMetadata(transactionalId: String, - transactionTimeoutMs: Int, - coordinatorEpoch: Int, - txnMetadata: TransactionMetadata): Either[InitPidResult, (Int, TransactionMetadataTransition)] = { - + private def initProducerIdWithExistingMetadata(transactionalId: String, + transactionTimeoutMs: Int, + coordinatorEpoch: Int, + txnMetadata: TransactionMetadata): Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = { if (txnMetadata.pendingTransitionInProgress) { // return a retriable exception to let the client backoff and retry Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS)) @@ -216,8 +214,8 @@ class TransactionCoordinator(brokerId: Int, def handleAddPartitionsToTransaction(transactionalId: String, - pid: Long, - epoch: Short, + producerId: Long, + producerEpoch: Short, partitions: collection.Set[TopicPartition], responseCallback: AddPartitionsCallback): Unit = { val error = validateTransactionalId(transactionalId) @@ -225,10 +223,10 @@ class TransactionCoordinator(brokerId: Int, responseCallback(error) } else { // try to update the transaction metadata and append the updated metadata to txn log; - // if there is no such metadata treat it as invalid pid mapping error. + // if there is no such metadata treat it as invalid producerId mapping error. val result: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match { case None => - Left(Errors.INVALID_PID_MAPPING) + Left(Errors.INVALID_PRODUCER_ID_MAPPING) case Some(epochAndMetadata) => val coordinatorEpoch = epochAndMetadata.coordinatorEpoch @@ -236,9 +234,9 @@ class TransactionCoordinator(brokerId: Int, // generate the new transaction metadata with added partitions txnMetadata synchronized { - if (txnMetadata.producerId != pid) { - Left(Errors.INVALID_PID_MAPPING) - } else if (txnMetadata.producerEpoch != epoch) { + if (txnMetadata.producerId != producerId) { + Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { Left(Errors.INVALID_PRODUCER_EPOCH) } else if (txnMetadata.pendingTransitionInProgress) { // return a retriable exception to let the client backoff and retry @@ -274,8 +272,8 @@ class TransactionCoordinator(brokerId: Int, } def handleEndTransaction(transactionalId: String, - pid: Long, - epoch: Short, + producerId: Long, + producerEpoch: Short, txnMarkerResult: TransactionResult, responseCallback: EndTxnCallback): Unit = { val error = validateTransactionalId(transactionalId) @@ -284,16 +282,16 @@ class TransactionCoordinator(brokerId: Int, else { val preAppendResult: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match { case None => - Left(Errors.INVALID_PID_MAPPING) + Left(Errors.INVALID_PRODUCER_ID_MAPPING) case Some(epochAndTxnMetadata) => val txnMetadata = epochAndTxnMetadata.transactionMetadata val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch txnMetadata synchronized { - if (txnMetadata.producerId != pid) - Left(Errors.INVALID_PID_MAPPING) - else if (txnMetadata.producerEpoch != epoch) + if (txnMetadata.producerId != producerId) + Left(Errors.INVALID_PRODUCER_ID_MAPPING) + else if (txnMetadata.producerEpoch != producerEpoch) Left(Errors.INVALID_PRODUCER_EPOCH) else if (txnMetadata.pendingTransitionInProgress) Left(Errors.CONCURRENT_TRANSACTIONS) @@ -343,9 +341,9 @@ class TransactionCoordinator(brokerId: Int, val txnMetadata = epochAndMetadata.transactionMetadata txnMetadata synchronized { - if (txnMetadata.producerId != pid) - Left(Errors.INVALID_PID_MAPPING) - else if (txnMetadata.producerEpoch != epoch) + if (txnMetadata.producerId != producerId) + Left(Errors.INVALID_PRODUCER_ID_MAPPING) + else if (txnMetadata.producerEpoch != producerEpoch) Left(Errors.INVALID_PRODUCER_EPOCH) else if (txnMetadata.pendingTransitionInProgress) Left(Errors.CONCURRENT_TRANSACTIONS) @@ -452,11 +450,11 @@ class TransactionCoordinator(brokerId: Int, isActive.set(false) scheduler.shutdown() txnMarkerPurgatory.shutdown() - pidManager.shutdown() + producerIdManager.shutdown() txnManager.shutdown() txnMarkerChannelManager.shutdown() info("Shutdown complete.") } } -case class InitPidResult(pid: Long, epoch: Short, error: Errors) +case class InitProducerIdResult(producerId: Long, producerEpoch: Short, error: Errors) http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index b7a2e80..90c9c42 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -251,7 +251,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig, addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet) } - def addTxnMarkersToBrokerQueue(transactionalId: String, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = { + def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short, + result: TransactionResult, coordinatorEpoch: Int, + topicPartitions: immutable.Set[TopicPartition]): Unit = { val txnTopicPartition = txnStateManager.partitionFor(transactionalId) val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition => var brokerNode: Option[Node] = None @@ -269,7 +271,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) { - val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava)) + val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava) + val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker) addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker) }
