[ 
https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397130#comment-16397130
 ] 

ASF GitHub Bot commented on KAFKA-6634:
---------------------------------------

guozhangwang closed pull request #4684: KAFKA-6634: Delay starting new 
transaction in task.initializeTopology
URL: https://github.com/apache/kafka/pull/4684
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8529c9eca88..c806bfde47e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ void addNewTask(final T task) {
      * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     Set<TopicPartition> initializeNewTasks() {
         final Set<TopicPartition> readyPartitions = new HashSet<>();
@@ -240,18 +241,21 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, 
final Set<TopicPartition>
             log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
+                task.resume();
                 try {
-                    task.resume();
+                    transitionToRunning(task, new HashSet<TopicPartition>());
                 } catch (final TaskMigratedException e) {
+                    // we need to catch migration exception internally since 
this function
+                    // is triggered in the rebalance callback
                     log.info("Failed to resume {} {} since it got migrated to 
another thread already. " +
                             "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
                     final RuntimeException fatalException = 
closeZombieTask(task);
+                    running.remove(task.id());
                     if (fatalException != null) {
                         throw fatalException;
                     }
                     throw e;
                 }
-                transitionToRunning(task, new HashSet<TopicPartition>());
                 log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
@@ -271,6 +275,9 @@ private void addToRestoring(final T task) {
         }
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void transitionToRunning(final T task, final Set<TopicPartition> 
readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b8777ad5521..8d6e56a17aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ void removeAllSensors() {
      * @param cache                 the {@link ThreadCache} created by the 
thread
      * @param time                  the system {@link Time} of the thread
      * @param producer              the instance of {@link Producer} used to 
produce records
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public StreamTask(final TaskId id,
                       final Collection<TopicPartition> partitions,
@@ -149,14 +148,11 @@ public StreamTask(final TaskId id,
         partitionGroup = new PartitionGroup(partitionQueues);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // initialize transactions if eos is turned on, which will block if 
the previous transaction has not
+        // completed yet; do not start the first transaction until the 
topology has been initialized later
         if (eosEnabled) {
-            try {
-                this.producer.initTransactions();
-                this.producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
+            this.producer.initTransactions();
         }
     }
 
@@ -167,31 +163,38 @@ public boolean initializeStateStores() {
         return changelogPartitions().isEmpty();
     }
 
+    /**
+     * <pre>
+     * - (re-)initialize the topology of the task
+     * </pre>
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     @Override
     public void initializeTopology() {
         initTopology();
+
+        if (eosEnabled) {
+            try {
+                this.producer.beginTransaction();
+            } catch (final ProducerFencedException fatal) {
+                throw new TaskMigratedException(this, fatal);
+            }
+            transactionInFlight = true;
+        }
+
         processorContext.initialized();
         taskInitialized = true;
     }
 
     /**
      * <pre>
-     * - re-initialize the task
-     * - if (eos) begin new transaction
+     * - resume the task
      * </pre>
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
     public void resume() {
+        // nothing to do; new transaction will be started only after topology 
is initialized
         log.debug("Resuming");
-        if (eosEnabled) {
-            try {
-                producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
-        }
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e1c45b4d8bd..9bbc0da26a5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -346,9 +346,6 @@ public StateDirectory stateDirectory() {
             return stateDirectory;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS 
only)
-         */
         Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, 
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
             final List<T> createdTasks = new ArrayList<>();
             for (final Map.Entry<TaskId, Set<TopicPartition>> 
newTaskAndPartitions : tasksToBeCreated.entrySet()) {
@@ -401,9 +398,6 @@ public void close() {}
             this.threadClientId = threadClientId;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS 
only)
-         */
         @Override
         StreamTask createTask(final Consumer<byte[], byte[]> consumer, final 
TaskId taskId, final Set<TopicPartition> partitions) {
             taskCreatedSensor.record();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9f02834dd75..80df5179a37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -94,9 +94,6 @@
         this.adminClient = adminClient;
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     void createTasks(final Collection<TopicPartition> assignment) {
         if (consumer == null) {
             throw new IllegalStateException(logPrefix + "consumer has not been 
initialized while adding stream tasks. This should not happen.");
@@ -114,9 +111,6 @@ void createTasks(final Collection<TopicPartition> 
assignment) {
         consumer.pause(partitions);
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
         if (assignedActiveTasks.isEmpty()) {
             return;
@@ -156,9 +150,6 @@ private void addStreamTasks(final 
Collection<TopicPartition> assignment) {
         }
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStandbyTasks() {
         final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = 
this.assignedStandbyTasks;
         if (assignedStandbyTasks.isEmpty()) {
@@ -173,7 +164,6 @@ private void addStandbyTasks() {
             if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
                 newStandbyTasks.put(taskId, partitions);
             }
-
         }
 
         if (newStandbyTasks.isEmpty()) {
@@ -319,7 +309,7 @@ void setConsumer(final Consumer<byte[], byte[]> consumer) {
     /**
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
-     * @throws TaskMigratedException if another thread wrote to the changelog 
topic that is currently restored
+     * @throws TaskMigratedException if the task producer got fenced or 
consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
         final Set<TopicPartition> resumed = active.initializeNewTasks();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 4bb7828fe25..246d0477eb2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -256,9 +256,10 @@ public void shouldResumeMatchingSuspendedTasks() {
     }
 
     @Test
-    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+    public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
         mockRunningTaskSuspension();
         t1.resume();
+        t1.initializeTopology();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1165d76cf2e..a30582905de 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -776,6 +776,7 @@ public void 
shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAll
     @Test
     public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         assertTrue(producer.transactionInitialized());
         assertTrue(producer.transactionInFlight());
@@ -792,6 +793,7 @@ public void 
shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
     @Test
     public void 
shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled()
 {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -806,6 +808,7 @@ public void 
shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSusp
     @Test
     public void 
shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.suspend();
 
         assertTrue(producer.transactionCommitted());
@@ -828,6 +831,7 @@ public void 
shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSusp
     @Test
     public void shouldStartNewTransactionOnResumeIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -835,6 +839,7 @@ public void shouldStartNewTransactionOnResumeIfEosEnabled() 
{
         task.suspend();
 
         task.resume();
+        task.initializeTopology();
         assertTrue(producer.transactionInFlight());
     }
 
@@ -854,6 +859,7 @@ public void 
shouldNotStartNewTransactionOnResumeIfEosDisabled() {
     @Test
     public void shouldStartNewTransactionOnCommitIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 
0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -878,6 +884,7 @@ public void 
shouldNotStartNewTransactionOnCommitIfEosDisabled() {
     @Test
     public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.close(false, false);
         task = null;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Delay initiating the txn on producers until initializeTopology with EOS 
> turned on
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-6634
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6634
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> In Streams EOS implementation, the created producers for tasks will initiate 
> a txn immediately after being created in the constructor of `StreamTask`. 
> However, the task may not process any data and hence producer may not send 
> any records for that started txn for a long time because of the restoration 
> process. And with default txn.session.timeout valued at 60 seconds, it means 
> that if the restoration takes more than that amount of time, upon starting 
> the producer will immediately get the error that its producer epoch is 
> already old.
> To fix this, we should consider instantiating the txn only after the 
> restoration phase is done. Although this may have a caveat that if the 
> producer is already fenced, it will not be notified until then, in 
> initializeTopology. But I think this should not be a correctness issue since 
> during the restoration process we do not make any changes to the processing 
> state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to