This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 93fd570 MINOR: Retry setting aligned time until set (#4893)
93fd570 is described below
commit 93fd5707fab218abb2c9d13b034608f0a606bfc7
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Apr 19 14:51:24 2018 -0400
MINOR: Retry setting aligned time until set (#4893)
In the AbstractResetIntegrationTest we can have a transient error when
setting the time for the test where the new time is less than the original
time, for those cases we should catch the exception and re-try setting the time
once versus letting the test fail.
For testing, ran the entire streams test suite.
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>
---
.../integration/AbstractResetIntegrationTest.java | 31 ++++++++++++++++------
1 file changed, 23 insertions(+), 8 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 8a82bf9..249e2c3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -16,15 +16,13 @@
*/
package org.apache.kafka.streams.integration;
-import kafka.admin.AdminClient;
-import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -65,6 +63,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import kafka.admin.AdminClient;
+import kafka.tools.StreamsResetter;
+
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -106,11 +107,25 @@ public abstract class AbstractResetIntegrationTest {
kafkaAdminClient = (KafkaAdminClient)
org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
}
- // we align time to seconds to get clean window boundaries and thus
ensure the same result for each run
- // otherwise, input records could fall into different windows for
different runs depending on the initial mock time
- final long alignedTime = (System.currentTimeMillis() / 1000 + 1) *
1000;
- mockTime = cluster.time;
- mockTime.setCurrentTimeMs(alignedTime);
+ boolean timeSet = false;
+ while (!timeSet) {
+ timeSet = setCurrentTime();
+ }
+ }
+
+ private boolean setCurrentTime() {
+ boolean currentTimeSet = false;
+ try {
+ mockTime = cluster.time;
+ // we align time to seconds to get clean window boundaries and
thus ensure the same result for each run
+ // otherwise, input records could fall into different windows for
different runs depending on the initial mock time
+ final long alignedTime = (System.currentTimeMillis() / 1000 + 1) *
1000;
+ mockTime.setCurrentTimeMs(alignedTime);
+ currentTimeSet = true;
+ } catch (final IllegalArgumentException e) {
+ // don't care will retry until set
+ }
+ return currentTimeSet;
}
private void prepareConfigs() {
--
To stop receiving notification emails like this one, please contact
[email protected].