This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 821275e6b3 KAFKA-10405: Set purge interval explicitly in
PurgeRepartitionTopicIntegrationTest (#11948)
821275e6b3 is described below
commit 821275e6b3af146de8bab65107985c65c863f09f
Author: Luke Chen <[email protected]>
AuthorDate: Sat Mar 26 00:30:02 2022 +0800
KAFKA-10405: Set purge interval explicitly in
PurgeRepartitionTopicIntegrationTest (#11948)
In KIP-811, we added a new config repartition.purge.interval.ms to set
repartition purge interval. In this flaky test, we expected the purge interval
is the same as commit interval, which is not correct anymore (default is 30
sec). Set the purge interval explicitly to fix this issue.
Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang
<[email protected]>
---
.../streams/integration/PurgeRepartitionTopicIntegrationTest.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index ffb35312ea..37d8743521 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -161,6 +161,7 @@ public class PurgeRepartitionTopicIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
APPLICATION_ID);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
PURGE_INTERVAL_MS);
+
streamsConfiguration.put(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG,
PURGE_INTERVAL_MS);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
@@ -203,10 +204,11 @@ public class PurgeRepartitionTopicIntegrationTest {
TestUtils.waitForCondition(new
RepartitionTopicCreatedWithExpectedConfigs(), 60000,
"Repartition topic " + REPARTITION_TOPIC + " not created with
the expected configs after 60000 ms.");
+ // wait until we received more than 1 segment of data, so that we can
confirm the purge succeeds in next verification
TestUtils.waitForCondition(
- new RepartitionTopicVerified(currentSize -> currentSize > 0),
+ new RepartitionTopicVerified(currentSize -> currentSize >
PURGE_SEGMENT_BYTES),
60000,
- "Repartition topic " + REPARTITION_TOPIC + " not received data
after 60000 ms."
+ "Repartition topic " + REPARTITION_TOPIC + " not received more
than " + PURGE_SEGMENT_BYTES + "B of data after 60000 ms."
);
// we need long enough timeout to by-pass the log manager's
InitialTaskDelayMs, which is hard-coded on server side