This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch IGNITE-23388
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git

commit d1aeeb151ed72fffe264a96d40bd6459ffe90da8
Author: Maksim Timonin <[email protected]>
AuthorDate: Mon Oct 14 14:03:30 2024 +0300

    IGNITE-23388 Use TreeMap for collecting CdcEvents
---
 .../ignite/cdc/AbstractCdcEventsApplier.java       | 51 ++++++++++++----
 .../ignite/cdc/AbstractIgniteCdcStreamer.java      |  2 +-
 .../apache/ignite/cdc/CdcEventsIgniteApplier.java  | 13 +---
 .../cdc/thin/CdcEventsIgniteClientApplier.java     | 12 ++--
 .../apache/ignite/cdc/AbstractReplicationTest.java | 70 ++++++++++++++++++----
 .../cdc/CdcIgniteToIgniteReplicationTest.java      | 21 ++++---
 6 files changed, 121 insertions(+), 48 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
index 479a1db4..3f0a35a8 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
@@ -17,12 +17,16 @@
 
 package org.apache.ignite.cdc;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -31,15 +35,15 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFIN
 /**
  * Contains logic to process {@link CdcEvent} and apply them to the cluster.
  */
-public abstract class AbstractCdcEventsApplier<K, V> {
+public abstract class AbstractCdcEventsApplier<V> {
     /** Maximum batch size. */
     private final int maxBatchSize;
 
     /** Update batch. */
-    private final Map<K, V> updBatch = new HashMap<>();
+    private final Map<KeyCacheObject, V> updBatch = new 
TreeMap<>(this::compareKeyCacheObject);
 
     /** Remove batch. */
-    private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>();
+    private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new 
TreeMap<>(this::compareKeyCacheObject);
 
     /** */
     private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
@@ -81,7 +85,7 @@ public abstract class AbstractCdcEventsApplier<K, V> {
             }
 
             CacheEntryVersion order = evt.version();
-            K key = toKey(evt);
+            KeyCacheObject key = toKey(evt);
             GridCacheVersion ver = new 
GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), 
order.clusterId());
 
             if (evt.value() != null) {
@@ -144,19 +148,46 @@ public abstract class AbstractCdcEventsApplier<K, V> {
     }
 
     /** @return {@code True} if update batch should be applied. */
-    private boolean isApplyBatch(Map<K, ?> map, K key) {
+    private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject 
key) {
         return map.size() >= maxBatchSize || map.containsKey(key);
     }
 
-    /** @return Key. */
-    protected abstract K toKey(CdcEvent evt);
+    /** @return Key as KeyCacheObject. */
+    private KeyCacheObject toKey(CdcEvent evt) {
+        Object key = evt.key();
+
+        if (key instanceof KeyCacheObject)
+            return (KeyCacheObject)key;
+        else
+            return new KeyCacheObjectImpl(key, evt.keyBytes(), 
evt.partition());
+    }
+
+    /** Compares keys. */
+    private int compareKeyCacheObject(KeyCacheObject key1, KeyCacheObject 
key2) {
+        int cmp = Integer.compare(key1.hashCode(), key2.hashCode());
+
+        if (cmp != 0)
+            return cmp;
+
+        try {
+            // Bytes are cached in KeyCacheObject, because they are 
constructed from binary WAL segment files.
+            // Hence, no NPE is possible.
+            byte[] bytes1 = key1.valueBytes(null);
+            byte[] bytes2 = key2.valueBytes(null);
+
+            return CacheDataTree.compareBytes(bytes1, bytes2);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to compare keys in CdcEvent", e);
+        }
+    }
 
     /** @return Value. */
     protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion 
ver);
 
     /** Stores DR data. */
-    protected abstract void putAllConflict(int cacheId, Map<K, V> drMap);
+    protected abstract void putAllConflict(int cacheId, Map<KeyCacheObject, V> 
drMap);
 
     /** Removes DR data. */
-    protected abstract void removeAllConflict(int cacheId, Map<K, 
GridCacheVersion> drMap);
+    protected abstract void removeAllConflict(int cacheId, Map<KeyCacheObject, 
GridCacheVersion> drMap);
 }
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
index f56a9954..ed453810 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
@@ -80,7 +80,7 @@ public abstract class AbstractIgniteCdcStreamer implements 
CdcConsumer {
     protected int maxBatchSize;
 
     /** Events applier. */
-    protected AbstractCdcEventsApplier<?, ?> applier;
+    protected AbstractCdcEventsApplier<?> applier;
 
     /** Timestamp of last sent message. */
     protected AtomicLongMetric lastEvtTs;
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
index 9afda275..76a51515 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
@@ -26,7 +26,6 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import 
org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -44,7 +43,7 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETE
  * @see IgniteInternalCache#putAllConflict(Map)
  * @see IgniteInternalCache#removeAllConflict(Map)
  */
-public class CdcEventsIgniteApplier extends 
AbstractCdcEventsApplier<KeyCacheObject, GridCacheDrInfo> {
+public class CdcEventsIgniteApplier extends 
AbstractCdcEventsApplier<GridCacheDrInfo> {
     /** Destination cluster. */
     private final IgniteEx ignite;
 
@@ -82,16 +81,6 @@ public class CdcEventsIgniteApplier extends 
AbstractCdcEventsApplier<KeyCacheObj
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected KeyCacheObject toKey(CdcEvent evt) {
-        Object key = evt.key();
-
-        if (key instanceof KeyCacheObject)
-            return (KeyCacheObject)key;
-        else
-            return new KeyCacheObjectImpl(key, null, evt.partition());
-    }
-
     /** {@inheritDoc} */
     @Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, 
GridCacheVersion ver) {
         CacheObject cacheObj;
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
index 2a991213..2efac3ee 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cdc.AbstractCdcEventsApplier;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.collection.IntHashMap;
 import org.apache.ignite.internal.util.collection.IntMap;
@@ -35,7 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
  * @see TcpClientCache#putAllConflict(Map)
  * @see TcpClientCache#removeAllConflict(Map)
  */
-public class CdcEventsIgniteClientApplier extends 
AbstractCdcEventsApplier<Object, T3<Object, GridCacheVersion, Long>> {
+public class CdcEventsIgniteClientApplier extends 
AbstractCdcEventsApplier<T3<Object, GridCacheVersion, Long>> {
     /** Client connected to the destination cluster. */
     private final IgniteClient client;
 
@@ -53,23 +54,18 @@ public class CdcEventsIgniteClientApplier extends 
AbstractCdcEventsApplier<Objec
         this.client = client;
     }
 
-    /** {@inheritDoc} */
-    @Override protected Object toKey(CdcEvent evt) {
-        return evt.key();
-    }
-
     /** {@inheritDoc} */
     @Override protected T3<Object, GridCacheVersion, Long> toValue(int 
cacheId, CdcEvent evt, GridCacheVersion ver) {
         return new T3<>(evt.value(), ver, evt.expireTime());
     }
 
     /** {@inheritDoc} */
-    @Override protected void putAllConflict(int cacheId, Map<Object, 
T3<Object, GridCacheVersion, Long>> drMap) {
+    @Override protected void putAllConflict(int cacheId, Map<KeyCacheObject, 
T3<Object, GridCacheVersion, Long>> drMap) {
         cache(cacheId).putAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override protected void removeAllConflict(int cacheId, Map<Object, 
GridCacheVersion> drMap) {
+    @Override protected void removeAllConflict(int cacheId, 
Map<KeyCacheObject, GridCacheVersion> drMap) {
         cache(cacheId).removeAllConflict(drMap);
     }
 
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 9eddeea7..2542a077 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -18,13 +18,8 @@
 package org.apache.ignite.cdc;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -90,9 +85,8 @@ import static 
org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
 import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT;
 import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME;
 import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
-import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.testframework.GridTestUtils.*;
+import static org.junit.Assume.assumeTrue;
 
 /** */
 @RunWith(Parameterized.class)
@@ -310,6 +304,62 @@ public abstract class AbstractReplicationTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /** Test that CDC instances don't lock each other while streaming same 
keys from primary and backup. */
+    @Test
+    public void testConcurrentMixedKeys() throws Exception {
+        assumeTrue(backups > 0);
+        assumeTrue(atomicity == TRANSACTIONAL);
+
+        List<IgniteInternalFuture<?>> futs = 
startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+        try {
+            for (IgniteEx ign: F.asList(srcCluster[0], destCluster[0])) {
+                ign.createCache(new CacheConfiguration<TestKey, Integer>()
+                        .setName(ACTIVE_PASSIVE_CACHE)
+                        .setAtomicityMode(atomicity)
+                        .setBackups(backups)
+                        .setCacheMode(mode));
+            }
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            int cnt = 0;
+
+            // Setup bound for keys to increase probability to stream same 
keys through each CDC instance.
+            // The value should not be small to force CDC generates batches 
for putAllConflict call.
+            int keysCnt = 20;
+
+            while (cnt++ < 10_000) {
+                srcCluster[rnd.nextInt(2)]
+                    .cache(ACTIVE_PASSIVE_CACHE)
+                    .put(new TestKey(rnd.nextInt(keysCnt), null), 
rnd.nextInt());
+
+                if (cnt % 1_000 == 0)
+                    System.out.println("Load count = " + cnt);
+            }
+
+            // Check that all data received.
+            assertTrue(waitForCondition(() -> {
+                IgniteCache<TestKey, Integer> srcCache = 
srcCluster[0].cache(ACTIVE_PASSIVE_CACHE);
+                IgniteCache<TestKey, Integer> destCache = 
destCluster[0].cache(ACTIVE_PASSIVE_CACHE);
+
+                for (int i = 0; i < keysCnt; i++) {
+                    Integer srcVal = srcCache.get(new TestKey(i, null));
+                    Integer destVal = destCache.get(new TestKey(i, null));
+
+                    if (srcVal == null || !srcVal.equals(destVal))
+                        return false;
+                }
+
+                return true;
+
+            }, getTestTimeout()));
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
     /** Replication with complex SQL key. Data inserted via SQL. */
     @Test
     public void testActivePassiveReplicationComplexKeyWithSQL() throws 
Exception {
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 048721a4..beb4cafd 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.ignite.spi.systemview.view.SystemView;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT;
@@ -47,7 +48,7 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), 
destClusterCliCfg[i], destCluster, cache));
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), 
destClusterCliCfg[i], destCluster, cache, "cdc-src-" + i));
 
         return futs;
     }
@@ -56,11 +57,15 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
     @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
-        for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), 
destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE));
+        for (int i = 0; i < srcCluster.length; i++) {
+            futs.add(igniteToIgnite(
+                srcCluster[i].configuration(), destClusterCliCfg[i], 
destCluster, ACTIVE_ACTIVE_CACHE, "cdc-src-" + i));
+        }
 
-        for (int i = 0; i < destCluster.length; i++)
-            futs.add(igniteToIgnite(destCluster[i].configuration(), 
srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE));
+        for (int i = 0; i < destCluster.length; i++) {
+            futs.add(igniteToIgnite(
+                destCluster[i].configuration(), srcClusterCliCfg[i], 
srcCluster, ACTIVE_ACTIVE_CACHE, "cdc-dest-" + i));
+        }
 
         return futs;
     }
@@ -81,13 +86,15 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
      * @param destCfg Ignite destination cluster configuration.
      * @param dest Ignite destination cluster.
      * @param cache Cache name to stream to kafka.
+     * @param threadName Thread to run CDC instance.
      * @return Future for Change Data Capture application.
      */
     protected IgniteInternalFuture<?> igniteToIgnite(
         IgniteConfiguration srcCfg,
         IgniteConfiguration destCfg,
         IgniteEx[] dest,
-        String cache
+        String cache,
+        @Nullable String threadName
     ) {
         return runAsync(() -> {
             CdcConfiguration cdcCfg = new CdcConfiguration();
@@ -117,7 +124,7 @@ public class CdcIgniteToIgniteReplicationTest extends 
AbstractReplicationTest {
             cdcs.add(cdc);
 
             cdc.run();
-        });
+        }, threadName);
     }
 
     /** */

Reply via email to