This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1016425e9f8 IGNITE-18249 Check for CDC consumer liveness added (#10398)
1016425e9f8 is described below
commit 1016425e9f82bc0cd10d41982be452b8abad4247
Author: Nikolay <[email protected]>
AuthorDate: Fri Nov 25 16:54:02 2022 +0300
IGNITE-18249 Check for CDC consumer liveness added (#10398)
---
.../core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java | 10 ++++++++++
.../src/main/java/org/apache/ignite/internal/cdc/CdcMain.java | 6 ++++++
.../org/apache/ignite/internal/cdc/WalRecordsConsumer.java | 10 ++++++++++
3 files changed, 26 insertions(+)
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
index 8e14dfd1e5d..0715757aff5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
@@ -138,4 +138,14 @@ public interface CdcConsumer {
* This method can be invoked only after {@link #start(MetricRegistry)}.
*/
public void stop();
+
+ /**
+ * Checks that consumer still alive.
+ * This method helps to determine {@link CdcConsumer} errors in case
{@link CdcEvent} is rare or source cluster is down.
+ *
+ * @return {@code True} in case consumer alive, {@code false} otherwise.
+ */
+ public default boolean alive() {
+ return true;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 91abaa7faa1..2f5cdfd252f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -418,6 +418,12 @@ public class CdcMain implements Runnable {
AtomicLong lastSgmnt = new AtomicLong(-1);
while (!stopped) {
+ if (!consumer.alive()) {
+ log.warning("Consumer is not alive. Ignite Change Data
Capture Application will be stopped.");
+
+ return;
+ }
+
try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
Set<Path> exists = new HashSet<>();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 1a752970bca..4fd7421afd1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -202,6 +202,16 @@ public class WalRecordsConsumer<K, V> {
log.info("WalRecordsConsumer stopped [consumer=" +
consumer.getClass() + ']');
}
+ /**
+ * Checks that consumer still alive.
+ * This method helps to determine {@link CdcConsumer} errors in case
{@link CdcEvent} is rare or source cluster is down.
+ *
+ * @return {@code True} in case consumer alive, {@code false} otherwise.
+ */
+ public boolean alive() {
+ return consumer.alive();
+ }
+
/** @return Change Data Capture Consumer. */
public CdcConsumer consumer() {
return consumer;