This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cd2f5f0  KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
cd2f5f0 is described below

commit cd2f5f030b353dfcaa7c3c8e18f69c681ea8e0f1
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Mon Jul 6 14:04:36 2020 -0700

    KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest (#8963)
    
    The current failures we're seeing with this test are due to faulty 
assumptions that it makes and not any real bug in eos-beta (at least, from what 
I've seen so far).
    
    The test relies on tightly controlling the commits, which it does by 
setting the commit interval to MAX_VALUE and manually requesting commits on the 
context. In two phases, the test assumes that any pending data will be 
committed after a rebalance. But we actually take care to avoid unnecessary 
commits -- with eos-alpha, we only commit tasks that are revoked while in 
eos-beta we must commit all tasks if any are revoked, but only if the revoked 
tasks themselves need a commit.
    
    The failure we see occurs when we try to verify the committed data after a 
second client is started and the group rebalances. The already-running client 
has to give up two tasks to the newly started client, but those tasks may not 
need to be committed in which case none of the tasks would be. So we still have 
an open transaction on the partitions where we try to read committed data.
    
    Reviewers: John Roesler <j...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../integration/EosBetaUpgradeIntegrationTest.java | 49 ++++++++++++++++++++--
 .../integration/utils/IntegrationTestUtils.java    |  2 +-
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index 6a550e7..cd57acb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Partitioner;
@@ -42,7 +43,10 @@ import 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAss
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -79,6 +83,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -147,6 +152,27 @@ public class EosBetaUpgradeIntegrationTest {
     private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
     private final AtomicInteger commitRequested = new AtomicInteger(0);
 
+    // Note: this pattern only works when we just have a single instance 
running with a single thread
+    // If we want to extend the test or reuse this CommitPunctuator we should 
tighten it up
+    private final AtomicBoolean requestCommit = new AtomicBoolean(false);
+    private static class CommitPunctuator implements Punctuator {
+        final ProcessorContext context;
+        final AtomicBoolean requestCommit;
+
+        public CommitPunctuator(final ProcessorContext context, final 
AtomicBoolean requestCommit) {
+            this.context = context;
+            this.requestCommit = requestCommit;
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            if (requestCommit.get()) {
+                context.commit();
+                requestCommit.set(false);
+            }
+        }
+    }
+
     private Throwable uncaughtException;
 
     private int testNumber = 0;
@@ -401,6 +427,8 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
             //   p-2: 10 rec + C + 5 rec ---> C
             //   p-3: 10 rec + C + 5 rec ---> C
+            requestCommit.set(true);
+            waitForCondition(() -> !requestCommit.get(), "Punctuator did not 
request commit for running client");
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -731,6 +759,8 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec ---> C
             //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 4 rec + A + 5 rec ---> C
             //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 5 rec + A + 5 rec ---> C
+            requestCommit.set(true);
+            waitForCondition(() -> !requestCommit.get(), "Punctuator did not 
request commit for running client");
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -824,6 +854,7 @@ public class EosBetaUpgradeIntegrationTest {
                     KeyValueStore<Long, Long> state = null;
                     AtomicBoolean crash;
                     AtomicInteger sharedCommit;
+                    Cancellable punctuator;
 
                     @Override
                     public void init(final ProcessorContext context) {
@@ -837,6 +868,11 @@ public class EosBetaUpgradeIntegrationTest {
                             crash = errorInjectedClient2;
                             sharedCommit = commitCounterClient2;
                         }
+                        punctuator = context.schedule(
+                            Duration.ofMillis(100),
+                            PunctuationType.WALL_CLOCK_TIME,
+                            new CommitPunctuator(context, requestCommit)
+                        );
                     }
 
                     @Override
@@ -869,7 +905,9 @@ public class EosBetaUpgradeIntegrationTest {
                     }
 
                     @Override
-                    public void close() { }
+                    public void close() {
+                        punctuator.cancel();
+                    }
                 };
             } }, storeNames)
             .to(MULTI_PARTITION_OUTPUT_TOPIC);
@@ -1036,9 +1074,12 @@ public class EosBetaUpgradeIntegrationTest {
         return expectedResult;
     }
 
-    private Set<Long> keysFromInstance(final KafkaStreams streams) {
-        final ReadOnlyKeyValueStore<Long, Long> store =
-            streams.store(StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()));
+    private Set<Long> keysFromInstance(final KafkaStreams streams) throws 
Exception {
+        final ReadOnlyKeyValueStore<Long, Long> store = getStore(
+            MAX_WAIT_TIME_MS,
+            streams,
+            StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore())
+        );
         final Set<Long> keys = new HashSet<>();
         try (final KeyValueIterator<Long, Long> it = store.all()) {
             while (it.hasNext()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index b8af7e9..d8fb5d2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1273,7 +1273,7 @@ public class IntegrationTestUtils {
          */
         public void waitForNextStableAssignment(final long maxWaitMs) throws 
InterruptedException {
             waitForCondition(
-                () -> nextExpectedNumStableAssignments == 
numStableAssignments(),
+                () -> numStableAssignments() >= 
nextExpectedNumStableAssignments,
                 maxWaitMs,
                 () -> "Client did not reach " + 
nextExpectedNumStableAssignments + " stable assignments on time, " +
                     "numStableAssignments was " + numStableAssignments()

Reply via email to