This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bd39e8c IGNITE-22403 Fixed partition ranges assignment for appliers 
of KafkaToIgnite streamers (#272)
5bd39e8c is described below

commit 5bd39e8c55006b52000c43d18b8b07a7949d7f40
Author: Maksim Davydov <[email protected]>
AuthorDate: Wed Jul 10 17:57:49 2024 +0300

    IGNITE-22403 Fixed partition ranges assignment for appliers of 
KafkaToIgnite streamers (#272)
---
 .../kafka/AbstractKafkaToIgniteCdcStreamer.java    | 47 +++++++++++------
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java |  4 ++
 .../KafkaToIgnitePartitionDistributionTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 95 insertions(+), 15 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index 5e12cc43..18de8cb6 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.GridLoggerProxy;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -153,26 +154,16 @@ abstract class AbstractKafkaToIgniteCdcStreamer 
implements Runnable {
             streamerCfg
         );
 
-        int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
-        int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
-        int threadCnt = streamerCfg.getThreadCount();
-
-        int partPerApplier = kafkaParts / threadCnt;
-
-        for (int i = 0; i < threadCnt; i++) {
-            int from = i * partPerApplier;
-            int to = (i + 1) * partPerApplier;
-
-            if (i == threadCnt - 1)
-                to = kafkaParts;
+        int cntr = 0;
 
+        for (T2<Integer, Integer> parts : kafkaPartitions(streamerCfg)) {
             KafkaToIgniteCdcStreamerApplier applier = new 
KafkaToIgniteCdcStreamerApplier(
                 () -> eventsApplier(),
                 log,
                 kafkaProps,
                 streamerCfg.getTopic(),
-                kafkaPartsFrom + from,
-                kafkaPartsFrom + to,
+                parts.get1(), // kafkaPartFrom
+                parts.get2(), // kafkaPartTo
                 caches,
                 streamerCfg.getMaxBatchSize(),
                 streamerCfg.getKafkaRequestTimeout(),
@@ -181,7 +172,7 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements 
Runnable {
                 stopped
             );
 
-            addAndStart("applier-thread-" + i, applier);
+            addAndStart("applier-thread-" + cntr++, applier);
         }
 
         try {
@@ -197,6 +188,32 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements 
Runnable {
         }
     }
 
+    /**
+     * Calculates Kafka partition ranges per applier thread.
+     * @param streamerCfg {@link KafkaToIgniteCdcStreamerConfiguration}.
+     * @return List of pairs defining partition ranges for each applier thread.
+     */
+    public static List<T2<Integer, Integer>> 
kafkaPartitions(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
+        List<T2<Integer, Integer>> parts = new ArrayList<>();
+
+        int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
+        int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
+        int threadCnt = streamerCfg.getThreadCount();
+
+        while (kafkaParts > 0) {
+            int partPerApplier = kafkaParts / threadCnt + (kafkaParts % 
threadCnt > 0 ? 1 : 0);
+
+            kafkaParts -= partPerApplier;
+            --threadCnt;
+
+            parts.add(new T2<>(kafkaPartsFrom, kafkaPartsFrom + 
partPerApplier));
+
+            kafkaPartsFrom += partPerApplier;
+        }
+
+        return parts;
+    }
+
     /** Adds applier to {@link #appliers} and starts thread with it. */
     private <T extends AutoCloseable & Runnable> void addAndStart(String 
threadName, T applier) {
         appliers.add(applier);
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index d113bba7..3dab9c79 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -168,6 +168,10 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, 
AutoCloseable {
 
     /** {@inheritDoc} */
     @Override public void run() {
+        if (log.isInfoEnabled())
+            log.info("Kafka to Ignite applier started [topic=" + topic + ", 
partFrom=" + kafkaPartFrom +
+                ", partTo=" + kafkaPartTo + "].");
+
         applier = applierSupplier.get();
 
         try {
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java
new file mode 100644
index 00000000..88dbf28f
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.List;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.ignite.cdc.kafka.AbstractKafkaToIgniteCdcStreamer.kafkaPartitions;
+import static org.apache.ignite.internal.util.lang.GridFunc.t;
+
+/** Tests Kafka partition ranges assignment to the multiple applier threads. */
+public class KafkaToIgnitePartitionDistributionTest extends 
GridCommonAbstractTest {
+    /** */
+    private final KafkaToIgniteCdcStreamerConfiguration streamerCfg = new 
KafkaToIgniteCdcStreamerConfiguration();
+
+    /** */
+    @Test
+    public void testKafkaPartitions() {
+        doTest(3, 0, 8, asList(t(0, 3), t(3, 6), t(6, 8)));
+        doTest(4, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 8)));
+        doTest(5, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 7), t(7, 8)));
+
+        doTest(3, 3, 11, asList(t(3, 6), t(6, 9), t(9, 11)));
+        doTest(4, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 11)));
+        doTest(5, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 10), t(10, 
11)));
+
+        doTest(3, 1, 4, asList(t(1, 2), t(2, 3), t(3, 4)));
+    }
+
+    /**
+     * @param threadCnt Applier threads count.
+     * @param expParts List of expected partition ranges.
+     */
+    private void doTest(int threadCnt, int partFrom, int partTo, 
List<IgniteBiTuple<Integer, Integer>> expParts) {
+        streamerCfg.setThreadCount(threadCnt);
+        streamerCfg.setKafkaPartsFrom(partFrom);
+        streamerCfg.setKafkaPartsTo(partTo);
+
+        assertEquals(expParts, kafkaPartitions(streamerCfg));
+    }
+}

Reply via email to