[
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508805#comment-16508805
]
ASF GitHub Bot commented on KAFKA-6906:
---------------------------------------
mjsax closed pull request #5105: KAFKA-6906: Fixed to commit transactions if
data is produced via wall clock punctuation
URL: https://github.com/apache/kafka/pull/5105
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e2be3e29172..4cea5280f86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -424,17 +424,22 @@ private void commitOffsets(final boolean
startNewTransaction) {
if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
- producer.commitTransaction();
- transactionInFlight = false;
- if (startNewTransaction) {
- producer.beginTransaction();
- transactionInFlight = true;
- }
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
}
commitOffsetNeeded = false;
- } else if (eosEnabled && !startNewTransaction &&
transactionInFlight) { // need to make sure to commit txn for suspend case
+ }
+
+ if (eosEnabled) {
+ producer.commitTransaction();
+ transactionInFlight = false;
+ if (startNewTransaction) {
+ producer.beginTransaction();
+ transactionInFlight = true;
+ }
+ }
+
+ if (eosEnabled && !startNewTransaction && transactionInFlight) {
// need to make sure to commit txn for suspend case
producer.commitTransaction();
transactionInFlight = false;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 5537335b221..bfbb2a00270 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,6 +35,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -913,6 +914,7 @@ public void
shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
@Test
public void shouldCloseProducerOnCloseWhenEosEnabled() {
task = createStatelessTask(createConfig(true));
+ task.initializeTopology();
task.close(true, false);
task = null;
@@ -1028,6 +1030,39 @@ public void
shouldReturnOffsetsForRepartitionTopicsForPurging() {
assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
}
+ @Test
+ public void
shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+ task = createStatelessTask(createConfig(true));
+ try {
+ task.close(true, false);
+ fail("should have throw IllegalStateException");
+ } catch (final IllegalStateException expected) {
+ // pass
+ }
+ task = null;
+
+ assertTrue(producer.closed());
+ }
+
+ @Test
+ public void shouldAlwaysCommitIfEosEnabled() {
+ final RecordCollectorImpl recordCollector = new
RecordCollectorImpl(producer, "StreamTask",
+ new LogContext("StreamTaskTest "), new
DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
+
+ task = createStatelessTask(createConfig(true));
+ task.initializeStateStores();
+ task.initializeTopology();
+ task.punctuate(processorSystemTime, 5,
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(final long timestamp) {
+ recordCollector.send("result-topic1", 3, 5, null, 0,
time.milliseconds(),
+ new IntegerSerializer(), new IntegerSerializer());
+ }
+ });
+ task.commit();
+ assertEquals(1, producer.history().size());
+ }
+
private StreamTask createStatefulTask(final StreamsConfig config, final
boolean logged) {
final ProcessorTopology topology = ProcessorTopology.with(
Utils.<ProcessorNode>mkList(source1, source2),
@@ -1144,5 +1179,4 @@ protected void flushState() {
recordValue
);
}
-
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 936c67b8ef8..1cc9c06c5b3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -660,7 +660,7 @@ public void
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh
new TestCondition() {
@Override
public boolean conditionMet() {
- return producer.commitCount() == 1;
+ return producer.commitCount() == 2;
}
},
"StreamsThread did not commit transaction.");
@@ -681,7 +681,7 @@ public boolean conditionMet() {
},
"StreamsThread did not remove fenced zombie task.");
- assertThat(producer.commitCount(), equalTo(1L));
+ assertThat(producer.commitCount(), equalTo(2L));
}
@Test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kafka Streams does not commit transactions if data is produced via wall-clock
> punctuation
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-6906
> URL: https://issues.apache.org/jira/browse/KAFKA-6906
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Matthias J. Sax
> Assignee: Jagadesh Adireddi
> Priority: Major
>
> Committing in Kafka Streams happens in regular intervals. However, committing
> only happens if new input records got processed since the last commit (via
> setting flag `commitOffsetNeeded` within `StreamTask#process()`)
> However, data could also be emitted via wall-clock based punctuation calls.
> Especially if EOS is enabled, this is an issue (maybe also for non-EOS)
> because the current running transaction is not committed and thus might time
> out leading to a fatal error.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)