This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 341fd7b  KAFKA-6634: Delay starting new transaction in 
task.initializeTopology (#4684)
341fd7b is described below

commit 341fd7b533447d2ee73867c179403bd16b3de094
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700

    KAFKA-6634: Delay starting new transaction in task.initializeTopology 
(#4684)
    
    As titled, not starting new transaction since during restoration producer 
would have not activity and hence may cause txn expiration. Also delay starting 
new txn in resuming until initializing topology.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck 
<b...@confluent.io>
---
 .../streams/processor/internals/AssignedTasks.java |  9 +++++++-
 .../streams/processor/internals/StreamTask.java    | 24 ++++++++++++++--------
 .../processor/internals/StreamTaskTest.java        |  7 +++++++
 3 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index fc82c6e..d59ec2b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -226,7 +226,14 @@ class AssignedTasks<T extends AbstractTask> {
                 suspended.remove(taskId);
                 log.trace("{} Resuming suspended {} {} with assigned 
partitions {}", logPrefix, taskTypeName, taskId, partitions);
                 task.resume();
-                transitionToRunning(task);
+                try {
+                    transitionToRunning(task);
+                } catch (final ProducerFencedException e) {
+                    closeZombieTask(task);
+                    suspended.remove(taskId);
+                    running.remove(task.id());
+                    throw e;
+                }
                 return true;
             } else {
                 log.trace("{} couldn't resume task {} assigned partitions {}, 
task partitions", logPrefix, taskId, partitions, task.partitions);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b628985..c2cc032 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -139,10 +139,11 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         this.time = time;
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // initialize transactions if eos is turned on, which will block if 
the previous transaction has not
+        // completed yet; do not start the first transaction until the 
topology has been initialized later
         if (eosEnabled) {
             this.producer.initTransactions();
-            this.producer.beginTransaction();
-            transactionInFlight = true;
         }
     }
 
@@ -153,26 +154,33 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         return changelogPartitions().isEmpty();
     }
 
+    /**
+     * <pre>
+     * - (re-)initialize the topology of the task
+     * </pre>
+     * @throws ProducerFencedException
+     */
     @Override
     public void initializeTopology() {
         initTopology();
+
+        if (eosEnabled) {
+            this.producer.beginTransaction();
+            transactionInFlight = true;
+        }
+
         processorContext.initialized();
         taskInitialized = true;
     }
 
     /**
      * <pre>
-     * - re-initialize the task
-     * - if (eos) begin new transaction
+     * - resume the task
      * </pre>
      */
     @Override
     public void resume() {
         log.debug("{} Resuming", logPrefix);
-        if (eosEnabled) {
-            producer.beginTransaction();
-            transactionInFlight = true;
-        }
         initTopology();
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 25de93c..0340913 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -635,6 +635,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         assertTrue(producer.transactionInitialized());
         assertTrue(producer.transactionInFlight());
@@ -664,6 +665,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -680,6 +682,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.suspend();
         assertTrue(producer.transactionCommitted());
@@ -707,6 +710,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -714,6 +718,7 @@ public class StreamTaskTest {
         task.suspend();
 
         task.resume();
+        task.initializeTopology();
         assertTrue(producer.transactionInFlight());
     }
 
@@ -737,6 +742,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -765,6 +771,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.close(false, false);
         task = null;

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to