This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1016425e9f8 IGNITE-18249 Check for CDC consumer liveness added (#10398)
1016425e9f8 is described below

commit 1016425e9f82bc0cd10d41982be452b8abad4247
Author: Nikolay <[email protected]>
AuthorDate: Fri Nov 25 16:54:02 2022 +0300

    IGNITE-18249 Check for CDC consumer liveness added (#10398)
---
 .../core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java  | 10 ++++++++++
 .../src/main/java/org/apache/ignite/internal/cdc/CdcMain.java  |  6 ++++++
 .../org/apache/ignite/internal/cdc/WalRecordsConsumer.java     | 10 ++++++++++
 3 files changed, 26 insertions(+)

diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java 
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
index 8e14dfd1e5d..0715757aff5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
@@ -138,4 +138,14 @@ public interface CdcConsumer {
      * This method can be invoked only after {@link #start(MetricRegistry)}.
      */
     public void stop();
+
+    /**
+     * Checks that consumer still alive.
+     * This method helps to determine {@link CdcConsumer} errors in case 
{@link CdcEvent} is rare or source cluster is down.
+     *
+     * @return {@code True} in case consumer alive, {@code false} otherwise.
+     */
+    public default boolean alive() {
+        return true;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 91abaa7faa1..2f5cdfd252f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -418,6 +418,12 @@ public class CdcMain implements Runnable {
             AtomicLong lastSgmnt = new AtomicLong(-1);
 
             while (!stopped) {
+                if (!consumer.alive()) {
+                    log.warning("Consumer is not alive. Ignite Change Data 
Capture Application will be stopped.");
+
+                    return;
+                }
+
                 try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
                     Set<Path> exists = new HashSet<>();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 1a752970bca..4fd7421afd1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -202,6 +202,16 @@ public class WalRecordsConsumer<K, V> {
             log.info("WalRecordsConsumer stopped [consumer=" + 
consumer.getClass() + ']');
     }
 
+    /**
+     * Checks that consumer still alive.
+     * This method helps to determine {@link CdcConsumer} errors in case 
{@link CdcEvent} is rare or source cluster is down.
+     *
+     * @return {@code True} in case consumer alive, {@code false} otherwise.
+     */
+    public boolean alive() {
+        return consumer.alive();
+    }
+
     /** @return Change Data Capture Consumer. */
     public CdcConsumer consumer() {
         return consumer;

Reply via email to