This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 821275e6b3 KAFKA-10405: Set purge interval explicitly in 
PurgeRepartitionTopicIntegrationTest (#11948)
821275e6b3 is described below

commit 821275e6b3af146de8bab65107985c65c863f09f
Author: Luke Chen <show...@gmail.com>
AuthorDate: Sat Mar 26 00:30:02 2022 +0800

    KAFKA-10405: Set purge interval explicitly in 
PurgeRepartitionTopicIntegrationTest (#11948)
    
    In KIP-811, we added a new config repartition.purge.interval.ms to set 
repartition purge interval. In this flaky test, we expected the purge interval 
is the same as commit interval, which is not correct anymore (default is 30 
sec). Set the purge interval explicitly to fix this issue.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../streams/integration/PurgeRepartitionTopicIntegrationTest.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index ffb35312ea..37d8743521 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -161,6 +161,7 @@ public class PurgeRepartitionTopicIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
PURGE_INTERVAL_MS);
+        
streamsConfiguration.put(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, 
PURGE_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
@@ -203,10 +204,11 @@ public class PurgeRepartitionTopicIntegrationTest {
         TestUtils.waitForCondition(new 
RepartitionTopicCreatedWithExpectedConfigs(), 60000,
                 "Repartition topic " + REPARTITION_TOPIC + " not created with 
the expected configs after 60000 ms.");
 
+        // wait until we received more than 1 segment of data, so that we can 
confirm the purge succeeds in next verification
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize > 0),
+            new RepartitionTopicVerified(currentSize -> currentSize > 
PURGE_SEGMENT_BYTES),
             60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not received data 
after 60000 ms."
+            "Repartition topic " + REPARTITION_TOPIC + " not received more 
than " + PURGE_SEGMENT_BYTES + "B of data after 60000 ms."
         );
 
         // we need long enough timeout to by-pass the log manager's 
InitialTaskDelayMs, which is hard-coded on server side

Reply via email to