This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new fcfb076d IGNITE-23388 Use SortedMap for collecting CdcEvents (#291)
fcfb076d is described below

commit fcfb076deba2621b296d2ab193c1ba0d16da60bf
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Oct 18 15:31:00 2024 +0300

    IGNITE-23388 Use SortedMap for collecting CdcEvents (#291)
---
 .../ignite/cdc/AbstractCdcEventsApplier.java       | 18 +++++--
 .../apache/ignite/cdc/AbstractReplicationTest.java | 57 ++++++++++++++++++++++
 .../cdc/CdcIgniteToIgniteReplicationTest.java      | 21 +++++---
 .../cdc/kafka/CdcKafkaReplicationAppsTest.java     | 10 ++--
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  | 42 +++++++++++-----
 5 files changed, 119 insertions(+), 29 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
index 479a1db4..ccf3f804 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
@@ -17,8 +17,9 @@
 
 package org.apache.ignite.cdc;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -36,10 +37,10 @@ public abstract class AbstractCdcEventsApplier<K, V> {
     private final int maxBatchSize;
 
     /** Update batch. */
-    private final Map<K, V> updBatch = new HashMap<>();
+    private final SortedMap<K, V> updBatch = new TreeMap<>(this::compareKey);
 
     /** Remove batch. */
-    private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>();
+    private final SortedMap<K, GridCacheVersion> rmvBatch = new 
TreeMap<>(this::compareKey);
 
     /** */
     private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
@@ -109,13 +110,12 @@ public abstract class AbstractCdcEventsApplier<K, V> {
      * @param applyUpd Apply update batch flag supplier.
      * @param applyRmv Apply remove batch flag supplier.
      * @return Number of applied events.
-     * @throws IgniteCheckedException In case of error.
      */
     private int applyIf(
         int cacheId,
         BooleanSupplier applyUpd,
         BooleanSupplier applyRmv
-    ) throws IgniteCheckedException {
+    ) {
         int evtsApplied = 0;
 
         if (applyUpd.getAsBoolean()) {
@@ -151,6 +151,14 @@ public abstract class AbstractCdcEventsApplier<K, V> {
     /** @return Key. */
     protected abstract K toKey(CdcEvent evt);
 
+    /**
+     * Compares keys hash codes only, because bytes might not be available.
+     * If hash codes are equal it put {@code key2} to next batch, see {@link 
#isApplyBatch)}.
+     */
+    private int compareKey(Object key1, Object key2) {
+        return Integer.compare(key1.hashCode(), key2.hashCode());
+    }
+
     /** @return Value. */
     protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion 
ver);
 
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 9eddeea7..9a5e18c7 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -93,6 +94,7 @@ import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_P
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeTrue;
 
 /** */
 @RunWith(Parameterized.class)
@@ -310,6 +312,61 @@ public abstract class AbstractReplicationTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /** Test that CDC instances don't lock each other while streaming mixed 
keys. */
+    @Test
+    public void testConcurrentMixedKeys() throws Exception {
+        assumeTrue(atomicity == TRANSACTIONAL);
+
+        for (IgniteEx ign: F.asList(srcCluster[0], destCluster[0])) {
+            ign.createCache(new CacheConfiguration<TestKey, Integer>()
+                    .setName(ACTIVE_PASSIVE_CACHE)
+                    .setAtomicityMode(atomicity)
+                    .setBackups(backups)
+                    .setCacheMode(mode));
+        }
+
+        List<IgniteInternalFuture<?>> futs = 
startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+        try {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            int cnt = 0;
+
+            // Setup bound for keys to increase probability to stream same 
keys through each CDC instance.
+            // The value should not be small to force CDC generates batches 
for putAllConflict call.
+            int keysCnt = 20;
+
+            while (cnt++ < 10_000) {
+                srcCluster[rnd.nextInt(2)]
+                    .cache(ACTIVE_PASSIVE_CACHE)
+                    .put(new TestKey(rnd.nextInt(keysCnt), null), 
rnd.nextInt());
+
+                if (cnt % 1_000 == 0)
+                    System.out.println("Load count = " + cnt);
+            }
+
+            // Check that all data received.
+            assertTrue(waitForCondition(() -> {
+                IgniteCache<TestKey, Integer> srcCache = 
srcCluster[0].cache(ACTIVE_PASSIVE_CACHE);
+                IgniteCache<TestKey, Integer> destCache = 
destCluster[0].cache(ACTIVE_PASSIVE_CACHE);
+
+                for (int i = 0; i < keysCnt; i++) {
+                    Integer srcVal = srcCache.get(new TestKey(i, null));
+                    Integer destVal = destCache.get(new TestKey(i, null));
+
+                    if (srcVal == null || !srcVal.equals(destVal))
+                        return false;
+                }
+
+                return true;
+
+            }, getTestTimeout()));
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
     /** Replication with complex SQL key. Data inserted via SQL. */
     @Test
     public void testActivePassiveReplicationComplexKeyWithSQL() throws 
Exception {
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 048721a4..b6d42e24 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.ignite.spi.systemview.view.SystemView;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT;
@@ -47,7 +48,7 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), 
destClusterCliCfg[i], destCluster, cache));
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), 
destClusterCliCfg[i], destCluster, cache, "ignite-to-ignite-src-" + i));
 
         return futs;
     }
@@ -56,11 +57,15 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
     @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
-        for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), 
destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE));
+        for (int i = 0; i < srcCluster.length; i++) {
+            futs.add(igniteToIgnite(
+                srcCluster[i].configuration(), destClusterCliCfg[i], 
destCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-src-" + i));
+        }
 
-        for (int i = 0; i < destCluster.length; i++)
-            futs.add(igniteToIgnite(destCluster[i].configuration(), 
srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE));
+        for (int i = 0; i < destCluster.length; i++) {
+            futs.add(igniteToIgnite(
+                destCluster[i].configuration(), srcClusterCliCfg[i], 
srcCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-dest-" + i));
+        }
 
         return futs;
     }
@@ -81,13 +86,15 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
      * @param destCfg Ignite destination cluster configuration.
      * @param dest Ignite destination cluster.
      * @param cache Cache name to stream to kafka.
+     * @param threadName Thread to run CDC instance.
      * @return Future for Change Data Capture application.
      */
     protected IgniteInternalFuture<?> igniteToIgnite(
         IgniteConfiguration srcCfg,
         IgniteConfiguration destCfg,
         IgniteEx[] dest,
-        String cache
+        String cache,
+        @Nullable String threadName
     ) {
         return runAsync(() -> {
             CdcConfiguration cdcCfg = new CdcConfiguration();
@@ -117,7 +124,7 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
             cdcs.add(cdc);
 
             cdc.run();
-        });
+        }, threadName);
     }
 
     /** */
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 96f1d242..927a7b25 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -112,7 +112,8 @@ public class CdcKafkaReplicationAppsTest extends 
CdcKafkaReplicationTest {
         IgniteConfiguration igniteCfg,
         String topic,
         String metadataTopic,
-        String cache
+        String cache,
+        String threadName
     ) {
         Map<String, String> params = new HashMap<>();
 
@@ -127,7 +128,7 @@ public class CdcKafkaReplicationAppsTest extends 
CdcKafkaReplicationTest {
         params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
 
         return runAsync(
-            () -> CdcCommandLineStartup.main(new String[] 
{prepareConfig("/replication/ignite-to-kafka.xml", params)})
+            () -> CdcCommandLineStartup.main(new String[] 
{prepareConfig("/replication/ignite-to-kafka.xml", params)}), threadName
         );
     }
 
@@ -139,7 +140,8 @@ public class CdcKafkaReplicationAppsTest extends 
CdcKafkaReplicationTest {
         IgniteConfiguration igniteCfg,
         IgniteEx[] dest,
         int partFrom,
-        int partTo
+        int partTo,
+        String threadName
     ) {
         Map<String, String> params = new HashMap<>();
 
@@ -174,7 +176,7 @@ public class CdcKafkaReplicationAppsTest extends 
CdcKafkaReplicationTest {
         params.put(METRIC_REG_NAME, DFLT_METRICS_REG_NAME + "-" + 
igniteCfg.getIgniteInstanceName());
 
         return runAsync(
-            () -> KafkaToIgniteCommandLineStartup.main(new String[] 
{prepareConfig(cfg, params)})
+            () -> KafkaToIgniteCommandLineStartup.main(new String[] 
{prepareConfig(cfg, params)}), threadName
         );
     }
 
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 05c5154f..a56b2941 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -104,8 +104,11 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
 
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
-        for (IgniteEx ex : srcCluster)
-            futs.add(igniteToKafka(ex.configuration(), cache, 
SRC_DEST_META_TOPIC, cache));
+        for (IgniteEx ex : srcCluster) {
+            int idx = getTestIgniteInstanceIndex(ex.name());
+
+            futs.add(igniteToKafka(ex.configuration(), cache, 
SRC_DEST_META_TOPIC, cache, "ignite-src-to-kafka-" + idx));
+        }
 
         for (int i = 0; i < destCluster.length; i++) {
             futs.add(kafkaToIgnite(
@@ -115,7 +118,8 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
                 destClusterCliCfg[i],
                 destCluster,
                 i * (DFLT_PARTS / 2),
-                (i + 1) * (DFLT_PARTS / 2)
+                (i + 1) * (DFLT_PARTS / 2),
+                 "kafka-to-ignite-dest-" + i
             ));
         }
 
@@ -126,11 +130,19 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
     @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
-        for (IgniteEx ex : srcCluster)
-            futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, 
SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE));
+        for (IgniteEx ex : srcCluster) {
+            int idx = getTestIgniteInstanceIndex(ex.name());
+
+            futs.add(igniteToKafka(
+                    ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, 
ACTIVE_ACTIVE_CACHE, "ignite-src-to-kafka-" + idx));
+        }
 
-        for (IgniteEx ex : destCluster)
-            futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, 
DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE));
+        for (IgniteEx ex : destCluster) {
+            int idx = getTestIgniteInstanceIndex(ex.name());
+
+            futs.add(igniteToKafka(
+                    ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, 
ACTIVE_ACTIVE_CACHE, "ignite-dest-to-kafka-" + idx));
+        }
 
         futs.add(kafkaToIgnite(
             ACTIVE_ACTIVE_CACHE,
@@ -139,7 +151,8 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
             destClusterCliCfg[0],
             destCluster,
             0,
-            DFLT_PARTS
+            DFLT_PARTS,
+            "kafka-to-ignite-src"
         ));
 
         futs.add(kafkaToIgnite(
@@ -149,7 +162,8 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
             srcClusterCliCfg[0],
             srcCluster,
             0,
-            DFLT_PARTS
+            DFLT_PARTS,
+            "kafka-to-ignite-dest"
         ));
 
         return futs;
@@ -247,7 +261,8 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
         IgniteConfiguration igniteCfg,
         String topic,
         String metadataTopic,
-        String cache
+        String cache,
+        String threadName
     ) {
         return runAsync(() -> {
             IgniteToKafkaCdcStreamer cdcCnsmr = new IgniteToKafkaCdcStreamer()
@@ -270,7 +285,7 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
             cdcs.add(cdc);
 
             cdc.run();
-        });
+        }, threadName);
     }
 
     /**
@@ -286,7 +301,8 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
         IgniteConfiguration igniteCfg,
         IgniteEx[] dest,
         int fromPart,
-        int toPart
+        int toPart,
+        String threadName
     ) {
         KafkaToIgniteCdcStreamerConfiguration cfg = new 
KafkaToIgniteCdcStreamerConfiguration();
 
@@ -315,7 +331,7 @@ public class CdcKafkaReplicationTest extends 
AbstractReplicationTest {
 
         kafkaStreamers.add(streamer);
 
-        return runAsync(streamer);
+        return runAsync(streamer, threadName);
     }
 
     /** */

Reply via email to