This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2116dd0a1d IGNITE-20182 Sql. Flaky test 
ExchangeExecutionTest#racesBetweenRewindAndBatchesFromPreviousRequest (#2534)
2116dd0a1d is described below

commit 2116dd0a1d634d03437a47621958e38f7603f269
Author: korlov42 <[email protected]>
AuthorDate: Mon Sep 4 14:43:56 2023 +0300

    IGNITE-20182 Sql. Flaky test 
ExchangeExecutionTest#racesBetweenRewindAndBatchesFromPreviousRequest (#2534)
---
 .../sql/engine/exec/rel/ExchangeExecutionTest.java  | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index bef187be4b..b465e1aaca 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.util.ArrayDeque;
@@ -215,7 +217,7 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest {
      * <p>The test verifies that batches from diagram above are ignored.
      */
     @Test
-    public void racesBetweenRewindAndBatchesFromPreviousRequest() {
+    public void racesBetweenRewindAndBatchesFromPreviousRequest() throws 
InterruptedException {
         UUID queryId = UUID.randomUUID();
         String dataNode1Name = "DATA_NODE_1";
         String dataNode2Name = "DATA_NODE_2";
@@ -257,6 +259,18 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest {
             // this is arrow 1 from the sequence
             BatchedResult<Object[]> res = await(root.requestNextAsync(2));
 
+            // We have to make sure that cursor was opened on node 2 before we 
proceed with test scenario.
+            // Otherwise there is a chance that rewind will outran scan task 
spawned by first request message.
+            // Problematic task sequence looks like this:
+            //      [taskId: 1, request N rows on Node 2]
+            //      [taskId: 2, rewind and request another N rows on Node2]
+            //      [taskId: 3, <spawned by taskId: 1> scan the iterator and 
emit rows on Node 2]
+            //
+            // This is not a problem for general query execution since rewind 
will clean up state and drop
+            // all collected rows so far, but TestDataProvider mutates its 
state on every invoke of `iterator()`
+            // method, and this test verify certain invariants relates to this 
mutation
+            assertTrue(waitForCondition(() -> 
node2DataProvider.awaitingResume() > 0, 1_000));
+
             assertThat(res.items(), hasSize(1));
             assertThat(res.items().get(0), equalTo(new Object[]{1, 0}));
 
@@ -671,6 +685,11 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest {
             }
         }
 
+        /** Returns approximate number of threads awaiting this data provider 
to be {@link #resume() resumed}. */
+        int awaitingResume() {
+            return lock.getQueueLength();
+        }
+
         /** {@inheritDoc} */
         @Override
         public Iterator<Object[]> iterator() {

Reply via email to