This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 2a3facf IGNITE-18249 Check for CDC consumer liveness added (#189)
2a3facf is described below
commit 2a3facf4dd46f29848f0a57581f3abe38d23abf6
Author: Nikolay <[email protected]>
AuthorDate: Fri Nov 25 16:58:44 2022 +0300
IGNITE-18249 Check for CDC consumer liveness added (#189)
---
.../ignite/cdc/IgniteToIgniteCdcStreamer.java | 35 +++++++++++++++-
.../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 3 --
.../cdc/thin/IgniteToIgniteClientCdcStreamer.java | 44 +++++++++++++++++++
.../cdc/CdcIgniteToIgniteReplicationTest.java | 49 ++++++++++++++++++++++
4 files changed, 127 insertions(+), 4 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index 9b7cc7d..146fc52 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -17,6 +17,7 @@
package org.apache.ignite.cdc;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
@@ -28,6 +29,11 @@ import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
+
+import static org.apache.ignite.lifecycle.LifecycleEventType.AFTER_NODE_STOP;
+import static org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_STOP;
/**
* Change Data Consumer that streams all data changes to provided {@link
#dest} Ignite cluster.
@@ -44,13 +50,16 @@ import org.apache.ignite.lang.IgniteExperimental;
* @see CacheVersionConflictResolverImpl
*/
@IgniteExperimental
-public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer {
+public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer
implements LifecycleBean {
/** Destination cluster client configuration. */
private IgniteConfiguration destIgniteCfg;
/** Destination Ignite cluster client */
private IgniteEx dest;
+ /** Alive flag. */
+ private volatile boolean alive = true;
+
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
super.start(mreg);
@@ -60,6 +69,20 @@ public class IgniteToIgniteCdcStreamer extends
AbstractIgniteCdcStreamer {
A.notNull(destIgniteCfg, "Destination Ignite configuration.");
+ LifecycleBean[] lifecycleBeans = destIgniteCfg.getLifecycleBeans();
+
+ if (lifecycleBeans != null) {
+ LifecycleBean[] newBeans = new LifecycleBean[lifecycleBeans.length
+ 1];
+
+ System.arraycopy(lifecycleBeans, 0, newBeans, 0,
lifecycleBeans.length);
+
+ newBeans[lifecycleBeans.length] = this;
+
+ destIgniteCfg.setLifecycleBeans(newBeans);
+ }
+ else
+ destIgniteCfg.setLifecycleBeans(this);
+
dest = (IgniteEx)Ignition.start(destIgniteCfg);
applier = new CdcEventsIgniteApplier(dest, maxBatchSize, log);
@@ -85,4 +108,14 @@ public class IgniteToIgniteCdcStreamer extends
AbstractIgniteCdcStreamer {
return this;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean alive() {
+ return alive;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onLifecycleEvent(LifecycleEventType evt) throws
IgniteException {
+ alive = evt != BEFORE_NODE_STOP && evt != AFTER_NODE_STOP;
+ }
}
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index 3ab2b93..a711748 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -147,9 +147,6 @@ public class IgniteToKafkaCdcStreamer implements
CdcConsumer {
/** Count of sent mappings. */
protected AtomicLongMetric mappingsCnt;
- /** Count of metadata updates. */
- protected byte metaUpdCnt = 0;
-
/** */
private List<Future<RecordMetadata>> futs;
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
index e62feba..3a6acad 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
@@ -21,6 +21,7 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.AbstractIgniteCdcStreamer;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
+import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.binary.BinaryContext;
@@ -46,12 +47,24 @@ import org.apache.ignite.internal.util.typedef.internal.A;
* @see CacheVersionConflictResolverImpl
*/
public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer
{
+ /** Default timeout to refresh "alive" value. */
+ public static final int DFLT_ALIVE_CHECK_TIMEOUT = 5 * 60_000;
+
/** Ignite thin client configuration. */
private ClientConfiguration destClientCfg;
/** Ignite thin client. */
private IgniteClient dest;
+ /** Alive flag. */
+ private volatile boolean alive = true;
+
+ /** Time of the last alive check. */
+ private volatile long lastAliveCheck = System.currentTimeMillis();
+
+ /** Timeout to check liveness of Ignite client. */
+ private long aliveCheckTimeout = DFLT_ALIVE_CHECK_TIMEOUT;
+
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
super.start(mreg);
@@ -87,4 +100,35 @@ public class IgniteToIgniteClientCdcStreamer extends
AbstractIgniteCdcStreamer {
return this;
}
+
+ /**
+ * Sets timeout to check aliveness of Ignite client in milliseconds.
+ *
+ * @param aliveCheckTimeout Alive check timeout in milliseconds.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToIgniteClientCdcStreamer setAliveCheckTimeout(long
aliveCheckTimeout) {
+ this.aliveCheckTimeout = aliveCheckTimeout;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean alive() {
+ long now = System.currentTimeMillis();
+
+ // Update alive value only by timeout.
+ if (now - lastAliveCheck > aliveCheckTimeout) {
+ try {
+ dest.cluster().state();
+
+ lastAliveCheck = now;
+ }
+ catch (ClientException e) {
+ alive = false;
+ }
+ }
+
+ return alive;
+ }
}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 8273fc9..5e70188 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -21,15 +21,21 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
+import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.junit.Test;
+import static
org.apache.ignite.cdc.AbstractReplicationTest.ClientType.CLIENT_NODE;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
@@ -82,6 +88,7 @@ public class CdcIgniteToIgniteReplicationTest extends
AbstractReplicationTest {
if (clientType == ClientType.THIN_CLIENT) {
streamer = new IgniteToIgniteClientCdcStreamer()
+ .setAliveCheckTimeout(5_000)
.setDestinationClientConfiguration(new
ClientConfiguration()
.setAddresses(hostAddresses(dest)));
}
@@ -101,4 +108,46 @@ public class CdcIgniteToIgniteReplicationTest extends
AbstractReplicationTest {
cdc.run();
});
}
+
+ /** */
+ @Test
+ public void testCdcStopOnClientNodeCrash() throws Exception {
+ List<IgniteInternalFuture<?>> cdcFuts =
startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+ if (clientType == CLIENT_NODE) {
+ assertTrue("Waiting for clients to connect", waitForCondition(
+ () -> destCluster[0].cluster().forClients().nodes().size() ==
srcCluster.length,
+ 30_000
+ ));
+ }
+ else {
+ assertTrue("Waiting for clients to connect", waitForCondition(
+ () -> {
+ int cliCnt = 0;
+
+ for (IgniteEx dest : destCluster) {
+ SystemView<?> view =
dest.context().systemView().view(ClientListenerProcessor.CLI_CONN_VIEW);
+
+ assertNotNull(view);
+
+ cliCnt += view.size();
+ }
+
+ return cliCnt >= srcCluster.length;
+ },
+ 30_000
+ ));
+ }
+
+ // Stopping destination cluster. IgniteToIgniteCdcStreamer connected
to it.
+ for (IgniteEx destIgnite : destCluster)
+ Ignition.stop(destIgnite.name(), true);
+
+ for (IgniteInternalFuture<?> cdcFut : cdcFuts) {
+ assertTrue(
+ "Waiting for clients fail and crash ignite-cdc",
+ waitForCondition(cdcFut::isDone, getTestTimeout())
+ );
+ }
+ }
}