This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2519e4af0c1 KAFKA-18038: fix flakey test
StreamThreadTest.shouldLogAndRecordSkippedRecordsForInvalidTimestamps (#17889)
2519e4af0c1 is described below
commit 2519e4af0c19d2540093c283f14dfe4111a5a21e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Nov 21 11:42:28 2024 -0800
KAFKA-18038: fix flakey test
StreamThreadTest.shouldLogAndRecordSkippedRecordsForInvalidTimestamps (#17889)
With KAFKA-17872, we changed some internals that effects the conditions
of this test, introducing a race condition when the expected log
messages are printed.
This PR adds additional wait-conditions to the test to close the race
condition.
Reviewers: Bill Bejeck <[email protected]>
---
.../processor/internals/StreamThreadTest.java | 31 ++++++++++++++++++++--
1 file changed, 29 insertions(+), 2 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 42b90bf3ec6..7af603b3239 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
@@ -141,6 +142,7 @@ import static
org.apache.kafka.streams.processor.internals.ClientUtils.adminClie
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.startsWith;
@@ -262,7 +264,8 @@ public class StreamThreadTest {
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName()),
mkEntry(InternalConfig.STATE_UPDATER_ENABLED,
Boolean.toString(stateUpdaterEnabled)),
- mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED,
Boolean.toString(processingThreadsEnabled))
+ mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED,
Boolean.toString(processingThreadsEnabled)),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1")
));
}
@@ -2979,7 +2982,11 @@ public class StreamThreadTest {
@ParameterizedTest
@MethodSource("data")
- public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
+ public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
+ final boolean stateUpdaterEnabled,
+ final boolean processingThreadsEnabled
+ ) throws Exception {
+
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
final Properties properties = configProps(false, stateUpdaterEnabled,
processingThreadsEnabled);
@@ -3013,12 +3020,20 @@ public class StreamThreadTest {
addRecord(mockConsumer, ++offset);
runOnce(processingThreadsEnabled);
+ if (processingThreadsEnabled) {
+ waitForCommit(mockConsumer, offset + 1);
+ }
+
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
runOnce(processingThreadsEnabled);
+ if (processingThreadsEnabled) {
+ waitForCommit(mockConsumer, offset + 1);
+ }
+
addRecord(mockConsumer, ++offset, 1L);
addRecord(mockConsumer, ++offset, 1L);
runOnce(processingThreadsEnabled);
@@ -3059,6 +3074,18 @@ public class StreamThreadTest {
}
}
+ private void waitForCommit(final MockConsumer<byte[], byte[]>
mockConsumer, final long expectedOffset) throws Exception {
+ waitForCondition(() -> {
+ mockTime.sleep(10L);
+ runOnce(true);
+ final Map<TopicPartition, OffsetAndMetadata> committed =
mockConsumer.committed(Collections.singleton(t1p1));
+ return !committed.isEmpty() && committed.get(t1p1).offset() ==
expectedOffset;
+ },
+ "Never committed offset " + expectedOffset
+ );
+
+ }
+
@ParameterizedTest
@MethodSource("data")
public void shouldTransmitTaskManagerMetrics(final boolean
stateUpdaterEnabled, final boolean processingThreadsEnabled) {