This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2519e4af0c1 KAFKA-18038: fix flakey test 
StreamThreadTest.shouldLogAndRecordSkippedRecordsForInvalidTimestamps (#17889)
2519e4af0c1 is described below

commit 2519e4af0c19d2540093c283f14dfe4111a5a21e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Nov 21 11:42:28 2024 -0800

    KAFKA-18038: fix flakey test 
StreamThreadTest.shouldLogAndRecordSkippedRecordsForInvalidTimestamps (#17889)
    
    With KAFKA-17872, we changed some internals that effects the conditions
    of this test, introducing a race condition when the expected log
    messages are printed.
    
    This PR adds additional wait-conditions to the test to close the race
    condition.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../processor/internals/StreamThreadTest.java      | 31 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 42b90bf3ec6..7af603b3239 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.producer.MockProducer;
@@ -141,6 +142,7 @@ import static 
org.apache.kafka.streams.processor.internals.ClientUtils.adminClie
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.startsWith;
@@ -262,7 +264,8 @@ public class StreamThreadTest {
             mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName()),
             mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName()),
             mkEntry(InternalConfig.STATE_UPDATER_ENABLED, 
Boolean.toString(stateUpdaterEnabled)),
-            mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, 
Boolean.toString(processingThreadsEnabled))
+            mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, 
Boolean.toString(processingThreadsEnabled)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1")
         ));
     }
 
@@ -2979,7 +2982,11 @@ public class StreamThreadTest {
 
     @ParameterizedTest
     @MethodSource("data")        
-    public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
+    public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
+        final boolean stateUpdaterEnabled,
+        final boolean processingThreadsEnabled
+    ) throws Exception {
+
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
         final Properties properties = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
@@ -3013,12 +3020,20 @@ public class StreamThreadTest {
             addRecord(mockConsumer, ++offset);
             runOnce(processingThreadsEnabled);
 
+            if (processingThreadsEnabled) {
+                waitForCommit(mockConsumer, offset + 1);
+            }
+
             addRecord(mockConsumer, ++offset);
             addRecord(mockConsumer, ++offset);
             addRecord(mockConsumer, ++offset);
             addRecord(mockConsumer, ++offset);
             runOnce(processingThreadsEnabled);
 
+            if (processingThreadsEnabled) {
+                waitForCommit(mockConsumer, offset + 1);
+            }
+
             addRecord(mockConsumer, ++offset, 1L);
             addRecord(mockConsumer, ++offset, 1L);
             runOnce(processingThreadsEnabled);
@@ -3059,6 +3074,18 @@ public class StreamThreadTest {
         }
     }
 
+    private void waitForCommit(final MockConsumer<byte[], byte[]> 
mockConsumer, final long expectedOffset) throws Exception {
+        waitForCondition(() -> {
+                mockTime.sleep(10L);
+                runOnce(true);
+                final Map<TopicPartition, OffsetAndMetadata> committed = 
mockConsumer.committed(Collections.singleton(t1p1));
+                return !committed.isEmpty() && committed.get(t1p1).offset() == 
expectedOffset;
+            },
+            "Never committed offset " + expectedOffset
+        );
+
+    }
+
     @ParameterizedTest
     @MethodSource("data")        
     public void shouldTransmitTaskManagerMetrics(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {

Reply via email to