This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2116dd0a1d IGNITE-20182 Sql. Flaky test
ExchangeExecutionTest#racesBetweenRewindAndBatchesFromPreviousRequest (#2534)
2116dd0a1d is described below
commit 2116dd0a1d634d03437a47621958e38f7603f269
Author: korlov42 <[email protected]>
AuthorDate: Mon Sep 4 14:43:56 2023 +0300
IGNITE-20182 Sql. Flaky test
ExchangeExecutionTest#racesBetweenRewindAndBatchesFromPreviousRequest (#2534)
---
.../sql/engine/exec/rel/ExchangeExecutionTest.java | 21 ++++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index bef187be4b..b465e1aaca 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -18,12 +18,14 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.ArrayDeque;
@@ -215,7 +217,7 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest {
* <p>The test verifies that batches from diagram above are ignored.
*/
@Test
- public void racesBetweenRewindAndBatchesFromPreviousRequest() {
+ public void racesBetweenRewindAndBatchesFromPreviousRequest() throws
InterruptedException {
UUID queryId = UUID.randomUUID();
String dataNode1Name = "DATA_NODE_1";
String dataNode2Name = "DATA_NODE_2";
@@ -257,6 +259,18 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest {
// this is arrow 1 from the sequence
BatchedResult<Object[]> res = await(root.requestNextAsync(2));
+ // We have to make sure that cursor was opened on node 2 before we
proceed with test scenario.
+ // Otherwise there is a chance that rewind will outran scan task
spawned by first request message.
+ // Problematic task sequence looks like this:
+ // [taskId: 1, request N rows on Node 2]
+ // [taskId: 2, rewind and request another N rows on Node2]
+ // [taskId: 3, <spawned by taskId: 1> scan the iterator and
emit rows on Node 2]
+ //
+ // This is not a problem for general query execution since rewind
will clean up state and drop
+ // all collected rows so far, but TestDataProvider mutates its
state on every invoke of `iterator()`
+ // method, and this test verify certain invariants relates to this
mutation
+ assertTrue(waitForCondition(() ->
node2DataProvider.awaitingResume() > 0, 1_000));
+
assertThat(res.items(), hasSize(1));
assertThat(res.items().get(0), equalTo(new Object[]{1, 0}));
@@ -671,6 +685,11 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest {
}
}
+ /** Returns approximate number of threads awaiting this data provider
to be {@link #resume() resumed}. */
+ int awaitingResume() {
+ return lock.getQueueLength();
+ }
+
/** {@inheritDoc} */
@Override
public Iterator<Object[]> iterator() {