This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93fd570  MINOR: Retry setting aligned time until set (#4893)
93fd570 is described below

commit 93fd5707fab218abb2c9d13b034608f0a606bfc7
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Apr 19 14:51:24 2018 -0400

    MINOR: Retry setting aligned time until set (#4893)
    
    In the AbstractResetIntegrationTest we can have a transient error when 
setting the time for the test where the new time is less than the original 
time, for those cases we should catch the exception and re-try setting the time 
once versus letting the test fail.
    
    For testing, ran the entire streams test suite.
    
    Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../integration/AbstractResetIntegrationTest.java  | 31 ++++++++++++++++------
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 8a82bf9..249e2c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -16,15 +16,13 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.admin.AdminClient;
-import kafka.tools.StreamsResetter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -65,6 +63,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import kafka.admin.AdminClient;
+import kafka.tools.StreamsResetter;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -106,11 +107,25 @@ public abstract class AbstractResetIntegrationTest {
             kafkaAdminClient = (KafkaAdminClient) 
org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
         }
 
-        // we align time to seconds to get clean window boundaries and thus 
ensure the same result for each run
-        // otherwise, input records could fall into different windows for 
different runs depending on the initial mock time
-        final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 
1000;
-        mockTime = cluster.time;
-        mockTime.setCurrentTimeMs(alignedTime);
+        boolean timeSet = false;
+        while (!timeSet) {
+            timeSet = setCurrentTime();
+        }
+    }
+
+    private boolean setCurrentTime() {
+        boolean currentTimeSet = false;
+        try {
+            mockTime = cluster.time;
+            // we align time to seconds to get clean window boundaries and 
thus ensure the same result for each run
+            // otherwise, input records could fall into different windows for 
different runs depending on the initial mock time
+            final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 
1000;
+            mockTime.setCurrentTimeMs(alignedTime);
+            currentTimeSet = true;
+        } catch (final IllegalArgumentException e) {
+            // don't care will retry until set
+        }
+        return currentTimeSet;
     }
 
     private void prepareConfigs() {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to