[ 
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508805#comment-16508805
 ] 

ASF GitHub Bot commented on KAFKA-6906:
---------------------------------------

mjsax closed pull request #5105: KAFKA-6906: Fixed to commit transactions if 
data is produced via wall clock punctuation
URL: https://github.com/apache/kafka/pull/5105
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e2be3e29172..4cea5280f86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -424,17 +424,22 @@ private void commitOffsets(final boolean 
startNewTransaction) {
 
                 if (eosEnabled) {
                     
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                    producer.commitTransaction();
-                    transactionInFlight = false;
-                    if (startNewTransaction) {
-                        producer.beginTransaction();
-                        transactionInFlight = true;
-                    }
                 } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
                 }
                 commitOffsetNeeded = false;
-            } else if (eosEnabled && !startNewTransaction && 
transactionInFlight) { // need to make sure to commit txn for suspend case
+            }
+
+            if (eosEnabled) {
+                producer.commitTransaction();
+                transactionInFlight = false;
+                if (startNewTransaction) {
+                    producer.beginTransaction();
+                    transactionInFlight = true;
+                }
+            }
+
+            if (eosEnabled && !startNewTransaction && transactionInFlight) { 
// need to make sure to commit txn for suspend case
                 producer.commitTransaction();
                 transactionInFlight = false;
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 5537335b221..bfbb2a00270 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -913,6 +914,7 @@ public void 
shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
     @Test
     public void shouldCloseProducerOnCloseWhenEosEnabled() {
         task = createStatelessTask(createConfig(true));
+        task.initializeTopology();
         task.close(true, false);
         task = null;
 
@@ -1028,6 +1030,39 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
         assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
     }
 
+    @Test
+    public void 
shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+        task = createStatelessTask(createConfig(true));
+        try {
+            task.close(true, false);
+            fail("should have throw IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            // pass
+        }
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
+    public void shouldAlwaysCommitIfEosEnabled() {
+        final RecordCollectorImpl recordCollector =  new 
RecordCollectorImpl(producer, "StreamTask",
+                new LogContext("StreamTaskTest "), new 
DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
+
+        task = createStatelessTask(createConfig(true));
+        task.initializeStateStores();
+        task.initializeTopology();
+        task.punctuate(processorSystemTime, 5, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+            @Override
+            public void punctuate(final long timestamp) {
+                recordCollector.send("result-topic1", 3, 5, null, 0, 
time.milliseconds(),
+                        new IntegerSerializer(),  new IntegerSerializer());
+            }
+        });
+        task.commit();
+        assertEquals(1, producer.history().size());
+    }
+
     private StreamTask createStatefulTask(final StreamsConfig config, final 
boolean logged) {
         final ProcessorTopology topology = ProcessorTopology.with(
             Utils.<ProcessorNode>mkList(source1, source2),
@@ -1144,5 +1179,4 @@ protected void flushState() {
             recordValue
         );
     }
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 936c67b8ef8..1cc9c06c5b3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -660,7 +660,7 @@ public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh
             new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    return producer.commitCount() == 1;
+                    return producer.commitCount() == 2;
                 }
             },
             "StreamsThread did not commit transaction.");
@@ -681,7 +681,7 @@ public boolean conditionMet() {
             },
             "StreamsThread did not remove fenced zombie task.");
 
-        assertThat(producer.commitCount(), equalTo(1L));
+        assertThat(producer.commitCount(), equalTo(2L));
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Kafka Streams does not commit transactions if data is produced via wall-clock 
> punctuation
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6906
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6906
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Matthias J. Sax
>            Assignee: Jagadesh Adireddi
>            Priority: Major
>
> Committing in Kafka Streams happens in regular intervals. However, committing 
> only happens if new input records got processed since the last commit (via 
> setting flag `commitOffsetNeeded` within `StreamTask#process()`)
> However, data could also be emitted via wall-clock based punctuation calls. 
> Especially if EOS is enabled, this is an issue (maybe also for non-EOS) 
> because the current running transaction is not committed and thus might time 
> out leading to a fatal error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to