This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new b7be97b MINOR: Fix ConcurrentModificationException in
TransactionManager (#4608)
b7be97b is described below
commit b7be97b4db46881954fe504349110a8475408a8e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Feb 22 22:23:14 2018 -0800
MINOR: Fix ConcurrentModificationException in TransactionManager (#4608)
---
checkstyle/suppressions.xml | 2 +-
.../producer/internals/TransactionManager.java | 6 +++--
.../producer/internals/TransactionManagerTest.java | 27 ++++++++++++++++++++++
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index bc7a2bd..d88c42b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator).java"/>
<suppress checks="JavaNCSS"
-
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
+
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
<suppress checks="JavaNCSS"
files="AbstractRequest.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 006a12b..b242d5a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
@@ -568,14 +569,15 @@ public class TransactionManager {
if (isTransactional())
// We should not reset producer state if we are transactional. We
will transition to a fatal error instead.
return false;
- for (TopicPartition topicPartition :
partitionsWithUnresolvedSequences) {
+ for (Iterator<TopicPartition> iter =
partitionsWithUnresolvedSequences.iterator(); iter.hasNext(); ) {
+ TopicPartition topicPartition = iter.next();
if (!hasInflightBatches(topicPartition)) {
// The partition has been fully drained. At this point, the
last ack'd sequence should be once less than
// next sequence destined for the partition. If so, the
partition is fully resolved. If not, we should
// reset the sequence number if necessary.
if (isNextSequence(topicPartition,
sequenceNumber(topicPartition))) {
// This would happen when a batch was expired, but
subsequent batches succeeded.
- partitionsWithUnresolvedSequences.remove(topicPartition);
+ iter.remove();
} else {
// We would enter this branch if all in flight batches
were ultimately expired in the producer.
log.info("No inflight batches remaining for {}, last ack'd
sequence for partition is {}, next sequence is {}. " +
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index fab139a..6fcf480 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -2213,6 +2213,33 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasOngoingTransaction());
}
+ @Test
+ public void testShouldResetProducerStateAfterResolvingSequences() throws
InterruptedException, ExecutionException {
+ // Create a TransactionManager without a transactionalId to test
+ // shouldResetProducerStateAfterResolvingSequences.
+ TransactionManager manager = new TransactionManager(logContext, null,
transactionTimeoutMs,
+ DEFAULT_RETRY_BACKOFF_MS);
+ assertFalse(manager.shouldResetProducerStateAfterResolvingSequences());
+ TopicPartition tp0 = new TopicPartition("foo", 0);
+ TopicPartition tp1 = new TopicPartition("foo", 1);
+ assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp0));
+ assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp1));
+
+ manager.incrementSequenceNumber(tp0, 1);
+ manager.incrementSequenceNumber(tp1, 1);
+ manager.maybeUpdateLastAckedSequence(tp0, 0);
+ manager.maybeUpdateLastAckedSequence(tp1, 0);
+ manager.markSequenceUnresolved(tp0);
+ manager.markSequenceUnresolved(tp1);
+ assertFalse(manager.shouldResetProducerStateAfterResolvingSequences());
+
+ manager.maybeUpdateLastAckedSequence(tp0, 5);
+ manager.incrementSequenceNumber(tp0, 1);
+ manager.markSequenceUnresolved(tp0);
+ manager.markSequenceUnresolved(tp1);
+ assertTrue(manager.shouldResetProducerStateAfterResolvingSequences());
+ }
+
private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors
error) throws InterruptedException {
final long pid = 1L;
final short epoch = 1;