This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new c7bdec7  KAFKA-6634: Delay starting new transaction in 
task.initializeTopology (#4684)
c7bdec7 is described below

commit c7bdec74bad366f485d055a68e910dd55cc65728
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700

    KAFKA-6634: Delay starting new transaction in task.initializeTopology 
(#4684)
    
    As titled, not starting new transaction since during restoration producer 
would have not activity and hence may cause txn expiration. Also delay starting 
new txn in resuming until initializing topology.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck 
<b...@confluent.io>
---
 .../streams/processor/internals/AssignedTasks.java | 13 +++++--
 .../streams/processor/internals/StreamTask.java    | 41 ++++++++++++----------
 .../streams/processor/internals/StreamThread.java  |  6 ----
 .../streams/processor/internals/TaskManager.java   | 12 +------
 .../internals/AssignedStreamsTasksTest.java        |  3 +-
 .../processor/internals/StreamTaskTest.java        |  7 ++++
 6 files changed, 43 insertions(+), 39 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2cd82f4..029f745 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ abstract class AssignedTasks<T extends Task> {
      * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     Set<TopicPartition> initializeNewTasks() {
         final Set<TopicPartition> readyPartitions = new HashSet<>();
@@ -239,17 +240,22 @@ abstract class AssignedTasks<T extends Task> {
             log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
+                task.resume();
                 try {
-                    task.resume();
+                    transitionToRunning(task, new HashSet<TopicPartition>());
                 } catch (final TaskMigratedException e) {
+                    // we need to catch migration exception internally since 
this function
+                    // is triggered in the rebalance callback
+                    log.info("Failed to resume {} {} since it got migrated to 
another thread already. " +
+                            "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
                     final RuntimeException fatalException = 
closeZombieTask(task);
+                    running.remove(task.id());
                     if (fatalException != null) {
                         throw fatalException;
                     }
                     suspended.remove(taskId);
                     throw e;
                 }
-                transitionToRunning(task, new HashSet<TopicPartition>());
                 log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
@@ -269,6 +275,9 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void transitionToRunning(final T task, final Set<TopicPartition> 
readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6bca02a..d04be04 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      * @param cache                 the {@link ThreadCache} created by the 
thread
      * @param time                  the system {@link Time} of the thread
      * @param producer              the instance of {@link Producer} used to 
produce records
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public StreamTask(final TaskId id,
                       final Collection<TopicPartition> partitions,
@@ -149,14 +148,11 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         partitionGroup = new PartitionGroup(partitionQueues);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // initialize transactions if eos is turned on, which will block if 
the previous transaction has not
+        // completed yet; do not start the first transaction until the 
topology has been initialized later
         if (eosEnabled) {
-            try {
-                this.producer.initTransactions();
-                this.producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
+            this.producer.initTransactions();
         }
     }
 
@@ -167,31 +163,38 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         return changelogPartitions().isEmpty();
     }
 
+    /**
+     * <pre>
+     * - (re-)initialize the topology of the task
+     * </pre>
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     @Override
     public void initializeTopology() {
         initTopology();
+
+        if (eosEnabled) {
+            try {
+                this.producer.beginTransaction();
+            } catch (final ProducerFencedException fatal) {
+                throw new TaskMigratedException(this, fatal);
+            }
+            transactionInFlight = true;
+        }
+
         processorContext.initialized();
         taskInitialized = true;
     }
 
     /**
      * <pre>
-     * - re-initialize the task
-     * - if (eos) begin new transaction
+     * - resume the task
      * </pre>
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
     public void resume() {
+        // nothing to do; new transaction will be started only after topology 
is initialized
         log.debug("Resuming");
-        if (eosEnabled) {
-            try {
-                producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
-        }
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb133c6..2937fc7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -346,9 +346,6 @@ public class StreamThread extends Thread {
             return stateDirectory;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS 
only)
-         */
         Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, 
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
             final List<T> createdTasks = new ArrayList<>();
             for (final Map.Entry<TaskId, Set<TopicPartition>> 
newTaskAndPartitions : tasksToBeCreated.entrySet()) {
@@ -401,9 +398,6 @@ public class StreamThread extends Thread {
             this.threadClientId = threadClientId;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS 
only)
-         */
         @Override
         StreamTask createTask(final Consumer<byte[], byte[]> consumer, final 
TaskId taskId, final Set<TopicPartition> partitions) {
             taskCreatedSensor.record();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62ddacf..72d7679 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -94,9 +94,6 @@ class TaskManager {
         this.adminClient = adminClient;
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     void createTasks(final Collection<TopicPartition> assignment) {
         if (consumer == null) {
             throw new IllegalStateException(logPrefix + "consumer has not been 
initialized while adding stream tasks. This should not happen.");
@@ -114,9 +111,6 @@ class TaskManager {
         consumer.pause(partitions);
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
         if (assignedActiveTasks.isEmpty()) {
             return;
@@ -156,9 +150,6 @@ class TaskManager {
         }
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStandbyTasks() {
         final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = 
this.assignedStandbyTasks;
         if (assignedStandbyTasks.isEmpty()) {
@@ -173,7 +164,6 @@ class TaskManager {
             if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
                 newStandbyTasks.put(taskId, partitions);
             }
-
         }
 
         if (newStandbyTasks.isEmpty()) {
@@ -320,7 +310,7 @@ class TaskManager {
     /**
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
-     * @throws TaskMigratedException if another thread wrote to the changelog 
topic that is currently restored
+     * @throws TaskMigratedException if the task producer got fenced or 
consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
         final Set<TopicPartition> resumed = active.initializeNewTasks();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 4bb7828..246d047 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -256,9 +256,10 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+    public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
         mockRunningTaskSuspension();
         t1.resume();
+        t1.initializeTopology();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1165d76..a305829 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -776,6 +776,7 @@ public class StreamTaskTest {
     @Test
     public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         assertTrue(producer.transactionInitialized());
         assertTrue(producer.transactionInFlight());
@@ -792,6 +793,7 @@ public class StreamTaskTest {
     @Test
     public void 
shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled()
 {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -806,6 +808,7 @@ public class StreamTaskTest {
     @Test
     public void 
shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.suspend();
 
         assertTrue(producer.transactionCommitted());
@@ -828,6 +831,7 @@ public class StreamTaskTest {
     @Test
     public void shouldStartNewTransactionOnResumeIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -835,6 +839,7 @@ public class StreamTaskTest {
         task.suspend();
 
         task.resume();
+        task.initializeTopology();
         assertTrue(producer.transactionInFlight());
     }
 
@@ -854,6 +859,7 @@ public class StreamTaskTest {
     @Test
     public void shouldStartNewTransactionOnCommitIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -878,6 +884,7 @@ public class StreamTaskTest {
     @Test
     public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.close(false, false);
         task = null;
 

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to