This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 2619efcb173 IGNITE-25706 Fail-fast detection for CDC tests (#12151) 2619efcb173 is described below commit 2619efcb1730377ea935d75b9c8eaec5d4bf62c8 Author: Dmitry Werner <grimekil...@gmail.com> AuthorDate: Mon Jul 7 16:05:26 2025 +0500 IGNITE-25706 Fail-fast detection for CDC tests (#12151) --- .../java/org/apache/ignite/cdc/AbstractCdcTest.java | 19 ++++++++++++++++++- .../test/java/org/apache/ignite/cdc/CdcSelfTest.java | 7 ++++--- .../org/apache/ignite/internal/cdc/SqlCdcTest.java | 5 +++-- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 12206c1ac58..6f118038955 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.CI3; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.spi.metric.HistogramMetric; import org.apache.ignite.spi.metric.LongMetric; @@ -167,7 +168,7 @@ public abstract class AbstractCdcTest extends GridCommonAbstractTest { else cdc = createCdc(cnsmr, cfg); - IgniteInternalFuture<?> fut = runAsync(cdc); + IgniteInternalFuture<?> fut = runCdcAsync(cdc, latch); addData.apply(cache, from, to); @@ -279,6 +280,22 @@ public abstract class AbstractCdcTest extends GridCommonAbstractTest { return null; } + /** */ + protected IgniteInternalFuture<?> runCdcAsync(CdcMain cdc, CountDownLatch latch) { + IgniteInternalFuture<?> fut = runAsync(cdc); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> initFut) { + if (initFut.error() != null) + log.error("The CDC main process has failed", initFut.error()); + + latch.countDown(); + } + }); + + return fut; + } + /** */ public abstract static class TestCdcConsumer<T> implements CdcConsumer { /** Keys. */ diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 967fbc8c25f..1c2132c497b 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -629,14 +629,15 @@ public class CdcSelfTest extends AbstractCdcTest { CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1); CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2); - IgniteInternalFuture<?> fut1 = runAsync(cdc1); - IgniteInternalFuture<?> fut2 = runAsync(cdc2); + IgniteInternalFuture<?> fut1 = runCdcAsync(cdc1, latch); + IgniteInternalFuture<?> fut2 = runCdcAsync(cdc2, latch); addDataFut.get(getTestTimeout()); runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2)).get(getTestTimeout()); - // Wait while predicate will become true and state saved on the disk for both cdc. + // Wait while predicate will become true and state saved on the disk for both cdc or + // while CdcMain futures completes when assertions fail in UserCdcConsumer methods. assertTrue(latch.await(getTestTimeout(), MILLISECONDS)); checkMetrics(cdc1, keysCnt[0]); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java index 8691a0a91d2..3b80ce9f6d2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java @@ -137,7 +137,7 @@ public class SqlCdcTest extends AbstractCdcTest { CdcMain cdc = createCdc(cnsmr, cfg, latch, userPredicate, cityPredicate); - IgniteInternalFuture<?> fut = runAsync(cdc); + IgniteInternalFuture<?> fut = runCdcAsync(cdc, latch); executeSql( ign, @@ -167,7 +167,8 @@ public class SqlCdcTest extends AbstractCdcTest { Integer.toString(127000 + i)); } - // Wait while both predicte will become true and state saved on the disk. + // Wait while both predicate will become true and state saved on the disk or + // while CdcMain future completes when assertions fail in BinaryCdcConsumer methods. assertTrue(latch.await(getTestTimeout(), MILLISECONDS)); checkMetrics(cdc, KEYS_CNT * 2);