This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch IGNITE-23388 in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
commit d1aeeb151ed72fffe264a96d40bd6459ffe90da8 Author: Maksim Timonin <[email protected]> AuthorDate: Mon Oct 14 14:03:30 2024 +0300 IGNITE-23388 Use TreeMap for collecting CdcEvents --- .../ignite/cdc/AbstractCdcEventsApplier.java | 51 ++++++++++++---- .../ignite/cdc/AbstractIgniteCdcStreamer.java | 2 +- .../apache/ignite/cdc/CdcEventsIgniteApplier.java | 13 +--- .../cdc/thin/CdcEventsIgniteClientApplier.java | 12 ++-- .../apache/ignite/cdc/AbstractReplicationTest.java | 70 ++++++++++++++++++---- .../cdc/CdcIgniteToIgniteReplicationTest.java | 21 ++++--- 6 files changed, 121 insertions(+), 48 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java index 479a1db4..3f0a35a8 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java @@ -17,12 +17,16 @@ package org.apache.ignite.cdc; -import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; import java.util.function.BooleanSupplier; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; @@ -31,15 +35,15 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFIN /** * Contains logic to process {@link CdcEvent} and apply them to the cluster. */ -public abstract class AbstractCdcEventsApplier<K, V> { +public abstract class AbstractCdcEventsApplier<V> { /** Maximum batch size. */ private final int maxBatchSize; /** Update batch. */ - private final Map<K, V> updBatch = new HashMap<>(); + private final Map<KeyCacheObject, V> updBatch = new TreeMap<>(this::compareKeyCacheObject); /** Remove batch. */ - private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>(); + private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new TreeMap<>(this::compareKeyCacheObject); /** */ private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch); @@ -81,7 +85,7 @@ public abstract class AbstractCdcEventsApplier<K, V> { } CacheEntryVersion order = evt.version(); - K key = toKey(evt); + KeyCacheObject key = toKey(evt); GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()); if (evt.value() != null) { @@ -144,19 +148,46 @@ public abstract class AbstractCdcEventsApplier<K, V> { } /** @return {@code True} if update batch should be applied. */ - private boolean isApplyBatch(Map<K, ?> map, K key) { + private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) { return map.size() >= maxBatchSize || map.containsKey(key); } - /** @return Key. */ - protected abstract K toKey(CdcEvent evt); + /** @return Key as KeyCacheObject. */ + private KeyCacheObject toKey(CdcEvent evt) { + Object key = evt.key(); + + if (key instanceof KeyCacheObject) + return (KeyCacheObject)key; + else + return new KeyCacheObjectImpl(key, evt.keyBytes(), evt.partition()); + } + + /** Compares keys. */ + private int compareKeyCacheObject(KeyCacheObject key1, KeyCacheObject key2) { + int cmp = Integer.compare(key1.hashCode(), key2.hashCode()); + + if (cmp != 0) + return cmp; + + try { + // Bytes are cached in KeyCacheObject, because they are constructed from binary WAL segment files. + // Hence, no NPE is possible. + byte[] bytes1 = key1.valueBytes(null); + byte[] bytes2 = key2.valueBytes(null); + + return CacheDataTree.compareBytes(bytes1, bytes2); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to compare keys in CdcEvent", e); + } + } /** @return Value. */ protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion ver); /** Stores DR data. */ - protected abstract void putAllConflict(int cacheId, Map<K, V> drMap); + protected abstract void putAllConflict(int cacheId, Map<KeyCacheObject, V> drMap); /** Removes DR data. */ - protected abstract void removeAllConflict(int cacheId, Map<K, GridCacheVersion> drMap); + protected abstract void removeAllConflict(int cacheId, Map<KeyCacheObject, GridCacheVersion> drMap); } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java index f56a9954..ed453810 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java @@ -80,7 +80,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { protected int maxBatchSize; /** Events applier. */ - protected AbstractCdcEventsApplier<?, ?> applier; + protected AbstractCdcEventsApplier<?> applier; /** Timestamp of last sent message. */ protected AtomicLongMetric lastEvtTs; diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java index 9afda275..76a51515 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java @@ -26,7 +26,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -44,7 +43,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETE * @see IgniteInternalCache#putAllConflict(Map) * @see IgniteInternalCache#removeAllConflict(Map) */ -public class CdcEventsIgniteApplier extends AbstractCdcEventsApplier<KeyCacheObject, GridCacheDrInfo> { +public class CdcEventsIgniteApplier extends AbstractCdcEventsApplier<GridCacheDrInfo> { /** Destination cluster. */ private final IgniteEx ignite; @@ -82,16 +81,6 @@ public class CdcEventsIgniteApplier extends AbstractCdcEventsApplier<KeyCacheObj } } - /** {@inheritDoc} */ - @Override protected KeyCacheObject toKey(CdcEvent evt) { - Object key = evt.key(); - - if (key instanceof KeyCacheObject) - return (KeyCacheObject)key; - else - return new KeyCacheObjectImpl(key, null, evt.partition()); - } - /** {@inheritDoc} */ @Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) { CacheObject cacheObj; diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java index 2a991213..2efac3ee 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java @@ -23,6 +23,7 @@ import org.apache.ignite.cdc.AbstractCdcEventsApplier; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.internal.client.thin.TcpClientCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.collection.IntHashMap; import org.apache.ignite.internal.util.collection.IntMap; @@ -35,7 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; * @see TcpClientCache#putAllConflict(Map) * @see TcpClientCache#removeAllConflict(Map) */ -public class CdcEventsIgniteClientApplier extends AbstractCdcEventsApplier<Object, T3<Object, GridCacheVersion, Long>> { +public class CdcEventsIgniteClientApplier extends AbstractCdcEventsApplier<T3<Object, GridCacheVersion, Long>> { /** Client connected to the destination cluster. */ private final IgniteClient client; @@ -53,23 +54,18 @@ public class CdcEventsIgniteClientApplier extends AbstractCdcEventsApplier<Objec this.client = client; } - /** {@inheritDoc} */ - @Override protected Object toKey(CdcEvent evt) { - return evt.key(); - } - /** {@inheritDoc} */ @Override protected T3<Object, GridCacheVersion, Long> toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) { return new T3<>(evt.value(), ver, evt.expireTime()); } /** {@inheritDoc} */ - @Override protected void putAllConflict(int cacheId, Map<Object, T3<Object, GridCacheVersion, Long>> drMap) { + @Override protected void putAllConflict(int cacheId, Map<KeyCacheObject, T3<Object, GridCacheVersion, Long>> drMap) { cache(cacheId).putAllConflict(drMap); } /** {@inheritDoc} */ - @Override protected void removeAllConflict(int cacheId, Map<Object, GridCacheVersion> drMap) { + @Override protected void removeAllConflict(int cacheId, Map<KeyCacheObject, GridCacheVersion> drMap) { cache(cacheId).removeAllConflict(drMap); } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java index 9eddeea7..2542a077 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java @@ -18,13 +18,8 @@ package org.apache.ignite.cdc; import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; @@ -90,9 +85,8 @@ import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName; import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT; import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE; -import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; -import static org.apache.ignite.testframework.GridTestUtils.runAsync; -import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; +import static org.apache.ignite.testframework.GridTestUtils.*; +import static org.junit.Assume.assumeTrue; /** */ @RunWith(Parameterized.class) @@ -310,6 +304,62 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest { } } + /** Test that CDC instances don't lock each other while streaming same keys from primary and backup. */ + @Test + public void testConcurrentMixedKeys() throws Exception { + assumeTrue(backups > 0); + assumeTrue(atomicity == TRANSACTIONAL); + + List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE); + + try { + for (IgniteEx ign: F.asList(srcCluster[0], destCluster[0])) { + ign.createCache(new CacheConfiguration<TestKey, Integer>() + .setName(ACTIVE_PASSIVE_CACHE) + .setAtomicityMode(atomicity) + .setBackups(backups) + .setCacheMode(mode)); + } + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + int cnt = 0; + + // Setup bound for keys to increase probability to stream same keys through each CDC instance. + // The value should not be small to force CDC generates batches for putAllConflict call. + int keysCnt = 20; + + while (cnt++ < 10_000) { + srcCluster[rnd.nextInt(2)] + .cache(ACTIVE_PASSIVE_CACHE) + .put(new TestKey(rnd.nextInt(keysCnt), null), rnd.nextInt()); + + if (cnt % 1_000 == 0) + System.out.println("Load count = " + cnt); + } + + // Check that all data received. + assertTrue(waitForCondition(() -> { + IgniteCache<TestKey, Integer> srcCache = srcCluster[0].cache(ACTIVE_PASSIVE_CACHE); + IgniteCache<TestKey, Integer> destCache = destCluster[0].cache(ACTIVE_PASSIVE_CACHE); + + for (int i = 0; i < keysCnt; i++) { + Integer srcVal = srcCache.get(new TestKey(i, null)); + Integer destVal = destCache.get(new TestKey(i, null)); + + if (srcVal == null || !srcVal.equals(destVal)) + return false; + } + + return true; + + }, getTestTimeout())); + } + finally { + for (IgniteInternalFuture<?> fut : futs) + fut.cancel(); + } + } + /** Replication with complex SQL key. Data inserted via SQL. */ @Test public void testActivePassiveReplicationComplexKeyWithSQL() throws Exception { diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java index 048721a4..beb4cafd 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.spi.systemview.view.SystemView; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT; @@ -47,7 +48,7 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { List<IgniteInternalFuture<?>> futs = new ArrayList<>(); for (int i = 0; i < srcCluster.length; i++) - futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache)); + futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache, "cdc-src-" + i)); return futs; } @@ -56,11 +57,15 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() { List<IgniteInternalFuture<?>> futs = new ArrayList<>(); - for (int i = 0; i < srcCluster.length; i++) - futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE)); + for (int i = 0; i < srcCluster.length; i++) { + futs.add(igniteToIgnite( + srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE, "cdc-src-" + i)); + } - for (int i = 0; i < destCluster.length; i++) - futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE)); + for (int i = 0; i < destCluster.length; i++) { + futs.add(igniteToIgnite( + destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE, "cdc-dest-" + i)); + } return futs; } @@ -81,13 +86,15 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { * @param destCfg Ignite destination cluster configuration. * @param dest Ignite destination cluster. * @param cache Cache name to stream to kafka. + * @param threadName Thread to run CDC instance. * @return Future for Change Data Capture application. */ protected IgniteInternalFuture<?> igniteToIgnite( IgniteConfiguration srcCfg, IgniteConfiguration destCfg, IgniteEx[] dest, - String cache + String cache, + @Nullable String threadName ) { return runAsync(() -> { CdcConfiguration cdcCfg = new CdcConfiguration(); @@ -117,7 +124,7 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { cdcs.add(cdc); cdc.run(); - }); + }, threadName); } /** */
