This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 5bd39e8c IGNITE-22403 Fixed partition ranges assignment for appliers
of KafkaToIgnite streamers (#272)
5bd39e8c is described below
commit 5bd39e8c55006b52000c43d18b8b07a7949d7f40
Author: Maksim Davydov <[email protected]>
AuthorDate: Wed Jul 10 17:57:49 2024 +0300
IGNITE-22403 Fixed partition ranges assignment for appliers of
KafkaToIgnite streamers (#272)
---
.../kafka/AbstractKafkaToIgniteCdcStreamer.java | 47 +++++++++++------
.../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 4 ++
.../KafkaToIgnitePartitionDistributionTest.java | 59 ++++++++++++++++++++++
3 files changed, 95 insertions(+), 15 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index 5e12cc43..18de8cb6 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -153,26 +154,16 @@ abstract class AbstractKafkaToIgniteCdcStreamer
implements Runnable {
streamerCfg
);
- int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
- int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
- int threadCnt = streamerCfg.getThreadCount();
-
- int partPerApplier = kafkaParts / threadCnt;
-
- for (int i = 0; i < threadCnt; i++) {
- int from = i * partPerApplier;
- int to = (i + 1) * partPerApplier;
-
- if (i == threadCnt - 1)
- to = kafkaParts;
+ int cntr = 0;
+ for (T2<Integer, Integer> parts : kafkaPartitions(streamerCfg)) {
KafkaToIgniteCdcStreamerApplier applier = new
KafkaToIgniteCdcStreamerApplier(
() -> eventsApplier(),
log,
kafkaProps,
streamerCfg.getTopic(),
- kafkaPartsFrom + from,
- kafkaPartsFrom + to,
+ parts.get1(), // kafkaPartFrom
+ parts.get2(), // kafkaPartTo
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
@@ -181,7 +172,7 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements
Runnable {
stopped
);
- addAndStart("applier-thread-" + i, applier);
+ addAndStart("applier-thread-" + cntr++, applier);
}
try {
@@ -197,6 +188,32 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements
Runnable {
}
}
+ /**
+ * Calculates Kafka partition ranges per applier thread.
+ * @param streamerCfg {@link KafkaToIgniteCdcStreamerConfiguration}.
+ * @return List of pairs defining partition ranges for each applier thread.
+ */
+ public static List<T2<Integer, Integer>>
kafkaPartitions(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
+ List<T2<Integer, Integer>> parts = new ArrayList<>();
+
+ int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
+ int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
+ int threadCnt = streamerCfg.getThreadCount();
+
+ while (kafkaParts > 0) {
+ int partPerApplier = kafkaParts / threadCnt + (kafkaParts %
threadCnt > 0 ? 1 : 0);
+
+ kafkaParts -= partPerApplier;
+ --threadCnt;
+
+ parts.add(new T2<>(kafkaPartsFrom, kafkaPartsFrom +
partPerApplier));
+
+ kafkaPartsFrom += partPerApplier;
+ }
+
+ return parts;
+ }
+
/** Adds applier to {@link #appliers} and starts thread with it. */
private <T extends AutoCloseable & Runnable> void addAndStart(String
threadName, T applier) {
appliers.add(applier);
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index d113bba7..3dab9c79 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -168,6 +168,10 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable,
AutoCloseable {
/** {@inheritDoc} */
@Override public void run() {
+ if (log.isInfoEnabled())
+ log.info("Kafka to Ignite applier started [topic=" + topic + ",
partFrom=" + kafkaPartFrom +
+ ", partTo=" + kafkaPartTo + "].");
+
applier = applierSupplier.get();
try {
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java
new file mode 100644
index 00000000..88dbf28f
--- /dev/null
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.List;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static
org.apache.ignite.cdc.kafka.AbstractKafkaToIgniteCdcStreamer.kafkaPartitions;
+import static org.apache.ignite.internal.util.lang.GridFunc.t;
+
+/** Tests Kafka partition ranges assignment to the multiple applier threads. */
+public class KafkaToIgnitePartitionDistributionTest extends
GridCommonAbstractTest {
+ /** */
+ private final KafkaToIgniteCdcStreamerConfiguration streamerCfg = new
KafkaToIgniteCdcStreamerConfiguration();
+
+ /** */
+ @Test
+ public void testKafkaPartitions() {
+ doTest(3, 0, 8, asList(t(0, 3), t(3, 6), t(6, 8)));
+ doTest(4, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 8)));
+ doTest(5, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 7), t(7, 8)));
+
+ doTest(3, 3, 11, asList(t(3, 6), t(6, 9), t(9, 11)));
+ doTest(4, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 11)));
+ doTest(5, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 10), t(10,
11)));
+
+ doTest(3, 1, 4, asList(t(1, 2), t(2, 3), t(3, 4)));
+ }
+
+ /**
+ * @param threadCnt Applier threads count.
+ * @param expParts List of expected partition ranges.
+ */
+ private void doTest(int threadCnt, int partFrom, int partTo,
List<IgniteBiTuple<Integer, Integer>> expParts) {
+ streamerCfg.setThreadCount(threadCnt);
+ streamerCfg.setKafkaPartsFrom(partFrom);
+ streamerCfg.setKafkaPartsTo(partTo);
+
+ assertEquals(expParts, kafkaPartitions(streamerCfg));
+ }
+}