This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a3facf  IGNITE-18249 Check for CDC consumer liveness added (#189)
2a3facf is described below

commit 2a3facf4dd46f29848f0a57581f3abe38d23abf6
Author: Nikolay <[email protected]>
AuthorDate: Fri Nov 25 16:58:44 2022 +0300

    IGNITE-18249 Check for CDC consumer liveness added (#189)
---
 .../ignite/cdc/IgniteToIgniteCdcStreamer.java      | 35 +++++++++++++++-
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java |  3 --
 .../cdc/thin/IgniteToIgniteClientCdcStreamer.java  | 44 +++++++++++++++++++
 .../cdc/CdcIgniteToIgniteReplicationTest.java      | 49 ++++++++++++++++++++++
 4 files changed, 127 insertions(+), 4 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index 9b7cc7d..146fc52 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cdc;
 
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
@@ -28,6 +29,11 @@ import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
+
+import static org.apache.ignite.lifecycle.LifecycleEventType.AFTER_NODE_STOP;
+import static org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_STOP;
 
 /**
  * Change Data Consumer that streams all data changes to provided {@link 
#dest} Ignite cluster.
@@ -44,13 +50,16 @@ import org.apache.ignite.lang.IgniteExperimental;
  * @see CacheVersionConflictResolverImpl
  */
 @IgniteExperimental
-public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer {
+public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer 
implements LifecycleBean {
     /** Destination cluster client configuration. */
     private IgniteConfiguration destIgniteCfg;
 
     /** Destination Ignite cluster client */
     private IgniteEx dest;
 
+    /** Alive flag. */
+    private volatile boolean alive = true;
+
     /** {@inheritDoc} */
     @Override public void start(MetricRegistry mreg) {
         super.start(mreg);
@@ -60,6 +69,20 @@ public class IgniteToIgniteCdcStreamer extends 
AbstractIgniteCdcStreamer {
 
         A.notNull(destIgniteCfg, "Destination Ignite configuration.");
 
+        LifecycleBean[] lifecycleBeans = destIgniteCfg.getLifecycleBeans();
+
+        if (lifecycleBeans != null) {
+            LifecycleBean[] newBeans = new LifecycleBean[lifecycleBeans.length 
+ 1];
+
+            System.arraycopy(lifecycleBeans, 0, newBeans, 0, 
lifecycleBeans.length);
+
+            newBeans[lifecycleBeans.length] = this;
+
+            destIgniteCfg.setLifecycleBeans(newBeans);
+        }
+        else
+            destIgniteCfg.setLifecycleBeans(this);
+
         dest = (IgniteEx)Ignition.start(destIgniteCfg);
 
         applier = new CdcEventsIgniteApplier(dest, maxBatchSize, log);
@@ -85,4 +108,14 @@ public class IgniteToIgniteCdcStreamer extends 
AbstractIgniteCdcStreamer {
 
         return this;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean alive() {
+        return alive;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onLifecycleEvent(LifecycleEventType evt) throws 
IgniteException {
+        alive = evt != BEFORE_NODE_STOP && evt != AFTER_NODE_STOP;
+    }
 }
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index 3ab2b93..a711748 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -147,9 +147,6 @@ public class IgniteToKafkaCdcStreamer implements 
CdcConsumer {
     /** Count of sent mappings. */
     protected AtomicLongMetric mappingsCnt;
 
-    /** Count of metadata updates. */
-    protected byte metaUpdCnt = 0;
-
     /** */
     private List<Future<RecordMetadata>> futs;
 
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
index e62feba..3a6acad 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
@@ -21,6 +21,7 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.cdc.AbstractIgniteCdcStreamer;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
+import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.internal.binary.BinaryContext;
@@ -46,12 +47,24 @@ import org.apache.ignite.internal.util.typedef.internal.A;
  * @see CacheVersionConflictResolverImpl
  */
 public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer 
{
+    /** Default timeout to refresh "alive" value. */
+    public static final int DFLT_ALIVE_CHECK_TIMEOUT = 5 * 60_000;
+
     /** Ignite thin client configuration. */
     private ClientConfiguration destClientCfg;
 
     /** Ignite thin client. */
     private IgniteClient dest;
 
+    /** Alive flag. */
+    private volatile boolean alive = true;
+
+    /** Time of the last alive check. */
+    private volatile long lastAliveCheck = System.currentTimeMillis();
+
+    /** Timeout to check liveness of Ignite client. */
+    private long aliveCheckTimeout = DFLT_ALIVE_CHECK_TIMEOUT;
+
     /** {@inheritDoc} */
     @Override public void start(MetricRegistry mreg) {
         super.start(mreg);
@@ -87,4 +100,35 @@ public class IgniteToIgniteClientCdcStreamer extends 
AbstractIgniteCdcStreamer {
 
         return this;
     }
+
+    /**
+     * Sets timeout to check aliveness of Ignite client in milliseconds.
+     *
+     * @param aliveCheckTimeout Alive check timeout in milliseconds.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToIgniteClientCdcStreamer setAliveCheckTimeout(long 
aliveCheckTimeout) {
+        this.aliveCheckTimeout = aliveCheckTimeout;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean alive() {
+        long now = System.currentTimeMillis();
+
+        // Update alive value only by timeout.
+        if (now - lastAliveCheck > aliveCheckTimeout) {
+            try {
+                dest.cluster().state();
+
+                lastAliveCheck = now;
+            }
+            catch (ClientException e) {
+                alive = false;
+            }
+        }
+
+        return alive;
+    }
 }
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 8273fc9..5e70188 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -21,15 +21,21 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.junit.Test;
 
+import static 
org.apache.ignite.cdc.AbstractReplicationTest.ClientType.CLIENT_NODE;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /** */
 public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
@@ -82,6 +88,7 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
 
             if (clientType == ClientType.THIN_CLIENT) {
                 streamer = new IgniteToIgniteClientCdcStreamer()
+                    .setAliveCheckTimeout(5_000)
                     .setDestinationClientConfiguration(new 
ClientConfiguration()
                         .setAddresses(hostAddresses(dest)));
             }
@@ -101,4 +108,46 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
             cdc.run();
         });
     }
+
+    /** */
+    @Test
+    public void testCdcStopOnClientNodeCrash() throws Exception {
+        List<IgniteInternalFuture<?>> cdcFuts = 
startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+        if (clientType == CLIENT_NODE) {
+            assertTrue("Waiting for clients to connect", waitForCondition(
+                () -> destCluster[0].cluster().forClients().nodes().size() == 
srcCluster.length,
+                30_000
+            ));
+        }
+        else {
+            assertTrue("Waiting for clients to connect", waitForCondition(
+                () -> {
+                    int cliCnt = 0;
+
+                    for (IgniteEx dest : destCluster) {
+                        SystemView<?> view = 
dest.context().systemView().view(ClientListenerProcessor.CLI_CONN_VIEW);
+
+                        assertNotNull(view);
+
+                        cliCnt += view.size();
+                    }
+
+                    return cliCnt >= srcCluster.length;
+                },
+                30_000
+            ));
+        }
+
+        // Stopping destination cluster. IgniteToIgniteCdcStreamer connected 
to it.
+        for (IgniteEx destIgnite : destCluster)
+            Ignition.stop(destIgnite.name(), true);
+
+        for (IgniteInternalFuture<?> cdcFut : cdcFuts) {
+            assertTrue(
+                "Waiting for clients fail and crash ignite-cdc",
+                waitForCondition(cdcFut::isDone, getTestTimeout())
+            );
+        }
+    }
 }

Reply via email to