This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b79fe66 IGNITE-19910 Add consumer poll timeout to kafka-cdc (#276)
9b79fe66 is described below

commit 9b79fe6611b8c9dd4f3d9c2323d4f547829e3270
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Thu Jun 13 11:48:54 2024 +0300

    IGNITE-19910 Add consumer poll timeout to kafka-cdc (#276)
---
 .../kafka/AbstractKafkaToIgniteCdcStreamer.java    |  2 ++
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java |  8 ++++-
 .../KafkaToIgniteCdcStreamerConfiguration.java     | 22 +++++++++++++
 .../cdc/kafka/KafkaToIgniteMetadataUpdater.java    |  6 +++-
 .../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java  | 38 ++++++++++++++++++++++
 .../kafka/KafkaToIgniteMetadataUpdaterTest.java    |  2 ++
 .../resources/loader/kafka-to-ignite-correct.xml   |  5 +++
 ...ml => kafka-to-ignite-invalid-poll-timeout.xml} |  1 +
 ...=> kafka-to-ignite-invalid-request-timeout.xml} |  1 +
 .../loader/thin/kafka-to-ignite-client-correct.xml |  5 +++
 ...afka-to-ignite-client-invalid-poll-timeout.xml} |  1 +
 ...a-to-ignite-client-invalid-request-timeout.xml} |  1 +
 12 files changed, 90 insertions(+), 2 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index 2df4d204..5e12cc43 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -96,6 +96,7 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements 
Runnable {
             streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
             "The Kafka partitions upper bound must be greater than lower 
bound.");
         A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request 
timeout cannot be negative.");
+        A.ensure(streamerCfg.getKafkaConsumerPollTimeout() >= 0, "The Kafka 
consumer poll timeout cannot be negative.");
         A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must 
me greater than zero.");
         A.ensure(
             streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= 
streamerCfg.getThreadCount(),
@@ -175,6 +176,7 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements 
Runnable {
                 caches,
                 streamerCfg.getMaxBatchSize(),
                 streamerCfg.getKafkaRequestTimeout(),
+                streamerCfg.getKafkaConsumerPollTimeout(),
                 metaUpdr,
                 stopped
             );
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index ceb688e9..d113bba7 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -107,6 +107,9 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, 
AutoCloseable {
     /** The maximum time to complete Kafka related requests, in milliseconds. 
*/
     private final long kafkaReqTimeout;
 
+    /** Consumer poll timeout in milliseconds. */
+    private final long consumerPollTimeout;
+
     /** Metadata updater. */
     private final KafkaToIgniteMetadataUpdater metaUpdr;
 
@@ -132,6 +135,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, 
AutoCloseable {
      * @param caches Cache ids.
      * @param maxBatchSize Maximum batch size.
      * @param kafkaReqTimeout The maximum time to complete Kafka related 
requests, in milliseconds.
+     * @param consumerPollTimeout Consumer poll timeout in milliseconds.
      * @param metaUpdr Metadata updater.
      * @param stopped Stopped flag.
      */
@@ -145,6 +149,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, 
AutoCloseable {
         Set<Integer> caches,
         int maxBatchSize,
         long kafkaReqTimeout,
+        long consumerPollTimeout,
         KafkaToIgniteMetadataUpdater metaUpdr,
         AtomicBoolean stopped
     ) {
@@ -155,6 +160,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, 
AutoCloseable {
         this.kafkaPartTo = kafkaPartTo;
         this.caches = caches;
         this.kafkaReqTimeout = kafkaReqTimeout;
+        this.consumerPollTimeout = consumerPollTimeout;
         this.metaUpdr = metaUpdr;
         this.stopped = stopped;
         this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
@@ -213,7 +219,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, 
AutoCloseable {
      * @param cnsmr Data consumer.
      */
     private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws 
IgniteCheckedException {
-        ConsumerRecords<Integer, byte[]> recs = 
cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
+        ConsumerRecords<Integer, byte[]> recs = 
cnsmr.poll(Duration.ofMillis(consumerPollTimeout));
 
         if (log.isInfoEnabled()) {
             log.info(
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
index 24675d43..a2e6e103 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -33,6 +33,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
     /** Default maximum time to complete Kafka related requests, in 
milliseconds. */
     public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;
 
+    /** Default kafka consumer poll timeout. */
+    public static final long DFLT_KAFKA_CONSUMER_POLL_TIMEOUT = 3_000L;
+
     /** Default {@link #threadCnt} value. */
     public static final int DFLT_THREAD_CNT = 16;
 
@@ -57,6 +60,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
     /** The maximum time to complete Kafka related requests, in milliseconds. 
*/
     private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
 
+    /** Timeout of kafka consumer poll */
+    private long kafkaConsumerPollTimeout = DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
+
     /** Metadata consumer group. */
     private String metadataCnsmrGrp;
 
@@ -171,6 +177,22 @@ public class KafkaToIgniteCdcStreamerConfiguration {
         this.kafkaReqTimeout = kafkaReqTimeout;
     }
 
+    /**
+     * @return The kafka consumer poll timeout in milliseconds.
+     */
+    public long getKafkaConsumerPollTimeout() {
+        return kafkaConsumerPollTimeout;
+    }
+
+    /**
+     * Sets the kafka consumer poll timeout in milliseconds.
+     *
+     * @param timeout Timeout value.
+     */
+    public void setKafkaConsumerPollTimeout(long timeout) {
+        this.kafkaConsumerPollTimeout = timeout;
+    }
+
     /**
      * @return Metadata topic name.
      */
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index 97b5bfed..236dc2f7 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -61,6 +61,9 @@ public class KafkaToIgniteMetadataUpdater implements 
AutoCloseable, OffsetCommit
     /** The maximum time to complete Kafka related requests, in milliseconds. 
*/
     private final long kafkaReqTimeout;
 
+    /** Consumer poll timeout. */
+    private final long consumerPollTimeout;
+
     /** */
     private final KafkaConsumer<Void, byte[]> cnsmr;
 
@@ -90,6 +93,7 @@ public class KafkaToIgniteMetadataUpdater implements 
AutoCloseable, OffsetCommit
     ) {
         this.ctx = ctx;
         this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
+        this.consumerPollTimeout = streamerCfg.getKafkaConsumerPollTimeout();
         this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);
 
         Properties kafkaProps = new Properties();
@@ -141,7 +145,7 @@ public class KafkaToIgniteMetadataUpdater implements 
AutoCloseable, OffsetCommit
         }
 
         while (true) {
-            ConsumerRecords<Void, byte[]> recs = 
cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
+            ConsumerRecords<Void, byte[]> recs = 
cnsmr.poll(Duration.ofMillis(consumerPollTimeout));
 
             if (recs.count() == 0) {
                 if (log.isDebugEnabled())
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 6938448b..918c0f8a 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.cdc.kafka;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -27,6 +29,12 @@ import static 
org.apache.ignite.testframework.GridTestUtils.assertThrows;
 
 /** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */
 public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
+    /** Constant to reference from xml config. */
+    public static final int TEST_KAFKA_CONSUMER_POLL_TIMEOUT = 2000;
+
+    /** Constant to reference from xml config. */
+    public static final int TEST_KAFKA_REQUEST_TIMEOUT = 6000;
+
     /** */
     @Test
     public void testLoadConfig() throws Exception {
@@ -85,6 +93,36 @@ public class KafkaToIgniteLoaderTest extends 
GridCommonAbstractTest {
         assertNotNull(streamer);
     }
 
+    /** Tests setting timeout properties of kafka to ignite loaders. */
+    @Test
+    public void testLoadTimeoutProperties() throws Exception {
+        Stream.of(
+            new String[] {
+                "loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml",
+                "Ouch! Argument is invalid: The Kafka consumer poll timeout 
cannot be negative."},
+            new String[] {
+                
"loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml",
+                "Ouch! Argument is invalid: The Kafka request timeout cannot 
be negative."
+            },
+            new String[] {
+                "loader/kafka-to-ignite-invalid-poll-timeout.xml",
+                "Ouch! Argument is invalid: The Kafka consumer poll timeout 
cannot be negative."},
+            new String[] {
+                "loader/kafka-to-ignite-invalid-request-timeout.xml",
+                "Ouch! Argument is invalid: The Kafka request timeout cannot 
be negative."
+            }
+        ).forEach(args -> assertThrows(null, () -> 
loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1]));
+
+        Stream.<AbstractKafkaToIgniteCdcStreamer>of(
+            
loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml"),
+            loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml")
+        ).forEach(streamer -> {
+            assertNotNull(streamer);
+            assertEquals(TEST_KAFKA_CONSUMER_POLL_TIMEOUT, 
streamer.streamerCfg.getKafkaConsumerPollTimeout());
+            assertEquals(TEST_KAFKA_REQUEST_TIMEOUT, 
streamer.streamerCfg.getKafkaRequestTimeout());
+        });
+    }
+
     /** */
     @Test
     public void testInitSpringContextOnce() throws Exception {
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
index 26b95d9e..5dc764fe 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -41,6 +41,7 @@ import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC
 import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
 import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
 import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
+import static 
org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
 import static 
org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -184,6 +185,7 @@ public class KafkaToIgniteMetadataUpdaterTest extends 
GridCommonAbstractTest {
         cfg.setKafkaPartsFrom(0);
         cfg.setKafkaPartsTo(DFLT_PARTS);
         cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+        cfg.setKafkaConsumerPollTimeout(DFLT_KAFKA_CONSUMER_POLL_TIMEOUT);
 
         return cfg;
     }
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
index b6934241..5b8655ab 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -36,11 +36,16 @@
         </property>
     </bean>
 
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
     <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
         <property name="topic" value="ignite" />
         <property name="metadataTopic" value="ignite-metadata" />
         <property name="kafkaPartsFrom" value="0" />
         <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml
similarity index 97%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to 
modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml
index b6934241..724fe443 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml
@@ -41,6 +41,7 @@
         <property name="metadataTopic" value="ignite-metadata" />
         <property name="kafkaPartsFrom" value="0" />
         <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaConsumerPollTimeout" value="-1"/>
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git 
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml
similarity index 97%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to 
modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml
index b6934241..8723df25 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ 
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml
@@ -41,6 +41,7 @@
         <property name="metadataTopic" value="ignite-metadata" />
         <property name="kafkaPartsFrom" value="0" />
         <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" value="-1"/>
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
index caa89488..ba5d179c 100644
--- 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
@@ -31,11 +31,16 @@
         </property>
     </bean>
 
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
 id="consumerPollTimeout" />
+    <util:constant 
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
 id="requestTimeout" />
+
     <bean id="streamer.cfg" 
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
         <property name="topic" value="ignite" />
         <property name="metadataTopic" value="ignite-metadata" />
         <property name="kafkaPartsFrom" value="0" />
         <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" ref="requestTimeout" />
+        <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml
similarity index 96%
copy from 
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
copy to 
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml
index caa89488..cd0c7864 100644
--- 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml
@@ -36,6 +36,7 @@
         <property name="metadataTopic" value="ignite-metadata" />
         <property name="kafkaPartsFrom" value="0" />
         <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaConsumerPollTimeout" value="-1"/>
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml
similarity index 97%
copy from 
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
copy to 
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml
index caa89488..3255d7bd 100644
--- 
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
+++ 
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml
@@ -36,6 +36,7 @@
         <property name="metadataTopic" value="ignite-metadata" />
         <property name="kafkaPartsFrom" value="0" />
         <property name="kafkaPartsTo" value="16" />
+        <property name="kafkaRequestTimeout" value="-1"/>
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />

Reply via email to