This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 9b79fe66 IGNITE-19910 Add consumer poll timeout to kafka-cdc (#276)
9b79fe66 is described below
commit 9b79fe6611b8c9dd4f3d9c2323d4f547829e3270
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Thu Jun 13 11:48:54 2024 +0300
IGNITE-19910 Add consumer poll timeout to kafka-cdc (#276)
---
.../kafka/AbstractKafkaToIgniteCdcStreamer.java | 2 ++
.../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 8 ++++-
.../KafkaToIgniteCdcStreamerConfiguration.java | 22 +++++++++++++
.../cdc/kafka/KafkaToIgniteMetadataUpdater.java | 6 +++-
.../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java | 38 ++++++++++++++++++++++
.../kafka/KafkaToIgniteMetadataUpdaterTest.java | 2 ++
.../resources/loader/kafka-to-ignite-correct.xml | 5 +++
...ml => kafka-to-ignite-invalid-poll-timeout.xml} | 1 +
...=> kafka-to-ignite-invalid-request-timeout.xml} | 1 +
.../loader/thin/kafka-to-ignite-client-correct.xml | 5 +++
...afka-to-ignite-client-invalid-poll-timeout.xml} | 1 +
...a-to-ignite-client-invalid-request-timeout.xml} | 1 +
12 files changed, 90 insertions(+), 2 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index 2df4d204..5e12cc43 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -96,6 +96,7 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements
Runnable {
streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
"The Kafka partitions upper bound must be greater than lower
bound.");
A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request
timeout cannot be negative.");
+ A.ensure(streamerCfg.getKafkaConsumerPollTimeout() >= 0, "The Kafka
consumer poll timeout cannot be negative.");
A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must
me greater than zero.");
A.ensure(
streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >=
streamerCfg.getThreadCount(),
@@ -175,6 +176,7 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements
Runnable {
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
+ streamerCfg.getKafkaConsumerPollTimeout(),
metaUpdr,
stopped
);
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index ceb688e9..d113bba7 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -107,6 +107,9 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable,
AutoCloseable {
/** The maximum time to complete Kafka related requests, in milliseconds.
*/
private final long kafkaReqTimeout;
+ /** Consumer poll timeout in milliseconds. */
+ private final long consumerPollTimeout;
+
/** Metadata updater. */
private final KafkaToIgniteMetadataUpdater metaUpdr;
@@ -132,6 +135,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable,
AutoCloseable {
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param kafkaReqTimeout The maximum time to complete Kafka related
requests, in milliseconds.
+ * @param consumerPollTimeout Consumer poll timeout in milliseconds.
* @param metaUpdr Metadata updater.
* @param stopped Stopped flag.
*/
@@ -145,6 +149,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable,
AutoCloseable {
Set<Integer> caches,
int maxBatchSize,
long kafkaReqTimeout,
+ long consumerPollTimeout,
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
) {
@@ -155,6 +160,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable,
AutoCloseable {
this.kafkaPartTo = kafkaPartTo;
this.caches = caches;
this.kafkaReqTimeout = kafkaReqTimeout;
+ this.consumerPollTimeout = consumerPollTimeout;
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
@@ -213,7 +219,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable,
AutoCloseable {
* @param cnsmr Data consumer.
*/
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws
IgniteCheckedException {
- ConsumerRecords<Integer, byte[]> recs =
cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
+ ConsumerRecords<Integer, byte[]> recs =
cnsmr.poll(Duration.ofMillis(consumerPollTimeout));
if (log.isInfoEnabled()) {
log.info(
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
index 24675d43..a2e6e103 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -33,6 +33,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
/** Default maximum time to complete Kafka related requests, in
milliseconds. */
public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;
+ /** Default kafka consumer poll timeout. */
+ public static final long DFLT_KAFKA_CONSUMER_POLL_TIMEOUT = 3_000L;
+
/** Default {@link #threadCnt} value. */
public static final int DFLT_THREAD_CNT = 16;
@@ -57,6 +60,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
/** The maximum time to complete Kafka related requests, in milliseconds.
*/
private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
+ /** Timeout of kafka consumer poll */
+ private long kafkaConsumerPollTimeout = DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
+
/** Metadata consumer group. */
private String metadataCnsmrGrp;
@@ -171,6 +177,22 @@ public class KafkaToIgniteCdcStreamerConfiguration {
this.kafkaReqTimeout = kafkaReqTimeout;
}
+ /**
+ * @return The kafka consumer poll timeout in milliseconds.
+ */
+ public long getKafkaConsumerPollTimeout() {
+ return kafkaConsumerPollTimeout;
+ }
+
+ /**
+ * Sets the kafka consumer poll timeout in milliseconds.
+ *
+ * @param timeout Timeout value.
+ */
+ public void setKafkaConsumerPollTimeout(long timeout) {
+ this.kafkaConsumerPollTimeout = timeout;
+ }
+
/**
* @return Metadata topic name.
*/
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index 97b5bfed..236dc2f7 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -61,6 +61,9 @@ public class KafkaToIgniteMetadataUpdater implements
AutoCloseable, OffsetCommit
/** The maximum time to complete Kafka related requests, in milliseconds.
*/
private final long kafkaReqTimeout;
+ /** Consumer poll timeout. */
+ private final long consumerPollTimeout;
+
/** */
private final KafkaConsumer<Void, byte[]> cnsmr;
@@ -90,6 +93,7 @@ public class KafkaToIgniteMetadataUpdater implements
AutoCloseable, OffsetCommit
) {
this.ctx = ctx;
this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
+ this.consumerPollTimeout = streamerCfg.getKafkaConsumerPollTimeout();
this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);
Properties kafkaProps = new Properties();
@@ -141,7 +145,7 @@ public class KafkaToIgniteMetadataUpdater implements
AutoCloseable, OffsetCommit
}
while (true) {
- ConsumerRecords<Void, byte[]> recs =
cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
+ ConsumerRecords<Void, byte[]> recs =
cnsmr.poll(Duration.ofMillis(consumerPollTimeout));
if (recs.count() == 0) {
if (log.isDebugEnabled())
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 6938448b..918c0f8a 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.cdc.kafka;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -27,6 +29,12 @@ import static
org.apache.ignite.testframework.GridTestUtils.assertThrows;
/** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */
public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
+ /** Constant to reference from xml config. */
+ public static final int TEST_KAFKA_CONSUMER_POLL_TIMEOUT = 2000;
+
+ /** Constant to reference from xml config. */
+ public static final int TEST_KAFKA_REQUEST_TIMEOUT = 6000;
+
/** */
@Test
public void testLoadConfig() throws Exception {
@@ -85,6 +93,36 @@ public class KafkaToIgniteLoaderTest extends
GridCommonAbstractTest {
assertNotNull(streamer);
}
+ /** Tests setting timeout properties of kafka to ignite loaders. */
+ @Test
+ public void testLoadTimeoutProperties() throws Exception {
+ Stream.of(
+ new String[] {
+ "loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml",
+ "Ouch! Argument is invalid: The Kafka consumer poll timeout
cannot be negative."},
+ new String[] {
+
"loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml",
+ "Ouch! Argument is invalid: The Kafka request timeout cannot
be negative."
+ },
+ new String[] {
+ "loader/kafka-to-ignite-invalid-poll-timeout.xml",
+ "Ouch! Argument is invalid: The Kafka consumer poll timeout
cannot be negative."},
+ new String[] {
+ "loader/kafka-to-ignite-invalid-request-timeout.xml",
+ "Ouch! Argument is invalid: The Kafka request timeout cannot
be negative."
+ }
+ ).forEach(args -> assertThrows(null, () ->
loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1]));
+
+ Stream.<AbstractKafkaToIgniteCdcStreamer>of(
+
loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml"),
+ loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml")
+ ).forEach(streamer -> {
+ assertNotNull(streamer);
+ assertEquals(TEST_KAFKA_CONSUMER_POLL_TIMEOUT,
streamer.streamerCfg.getKafkaConsumerPollTimeout());
+ assertEquals(TEST_KAFKA_REQUEST_TIMEOUT,
streamer.streamerCfg.getKafkaRequestTimeout());
+ });
+ }
+
/** */
@Test
public void testInitSpringContextOnce() throws Exception {
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
index 26b95d9e..5dc764fe 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -41,6 +41,7 @@ import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
+import static
org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
import static
org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -184,6 +185,7 @@ public class KafkaToIgniteMetadataUpdaterTest extends
GridCommonAbstractTest {
cfg.setKafkaPartsFrom(0);
cfg.setKafkaPartsTo(DFLT_PARTS);
cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+ cfg.setKafkaConsumerPollTimeout(DFLT_KAFKA_CONSUMER_POLL_TIMEOUT);
return cfg;
}
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
index b6934241..5b8655ab 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -36,11 +36,16 @@
</property>
</bean>
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
<bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml
similarity index 97%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to
modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml
index b6934241..724fe443 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml
@@ -41,6 +41,7 @@
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
+ <property name="kafkaConsumerPollTimeout" value="-1"/>
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git
a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml
similarity index 97%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to
modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml
index b6934241..8723df25 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++
b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml
@@ -41,6 +41,7 @@
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" value="-1"/>
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
index caa89488..ba5d179c 100644
---
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
@@ -31,11 +31,16 @@
</property>
</bean>
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT"
id="consumerPollTimeout" />
+ <util:constant
static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT"
id="requestTimeout" />
+
<bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" ref="requestTimeout" />
+ <property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml
similarity index 96%
copy from
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
copy to
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml
index caa89488..cd0c7864 100644
---
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml
@@ -36,6 +36,7 @@
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
+ <property name="kafkaConsumerPollTimeout" value="-1"/>
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
diff --git
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml
similarity index 97%
copy from
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
copy to
modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml
index caa89488..3255d7bd 100644
---
a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
+++
b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml
@@ -36,6 +36,7 @@
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
+ <property name="kafkaRequestTimeout" value="-1"/>
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />