This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c07258e61b IGNITE-20308 Fix ItAbstractDataStreamerTest flakiness
(#2556)
c07258e61b is described below
commit c07258e61b2bfc134c04b3d461dab330d8a509af
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Sep 6 14:26:55 2023 +0300
IGNITE-20308 Fix ItAbstractDataStreamerTest flakiness (#2556)
Wait for streamer completion in all tests to ensure that no updates happen
after test exit, causing conflicts with SQL in `clearTable`.
---
.../ignite/internal/streamer/ItAbstractDataStreamerTest.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index 5243e391e9..575361106d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.streamer;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -139,27 +141,33 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
@Test
public void testAutoFlushByTimer() throws InterruptedException {
RecordView<Tuple> view = this.defaultTable().recordView();
+ CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<Tuple>()) {
var options =
DataStreamerOptions.builder().autoFlushFrequency(100).build();
- view.streamData(publisher, options);
+ streamerFut = view.streamData(publisher, options);
publisher.submit(tuple(1, "foo"));
waitForKey(view, tupleKey(1));
}
+
+ assertThat(streamerFut, willSucceedIn(5, TimeUnit.SECONDS));
}
@Test
public void testAutoFlushDisabled() throws InterruptedException {
RecordView<Tuple> view = this.defaultTable().recordView();
+ CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<Tuple>()) {
var options =
DataStreamerOptions.builder().autoFlushFrequency(-1).build();
- view.streamData(publisher, options);
+ streamerFut = view.streamData(publisher, options);
publisher.submit(tuple(1, "foo"));
assertFalse(waitForCondition(() -> view.get(null, tupleKey(1)) !=
null, 1000));
}
+
+ assertThat(streamerFut, willSucceedIn(5, TimeUnit.SECONDS));
}
@Test