This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new fcfb076d IGNITE-23388 Use SortedMap for collecting CdcEvents (#291)
fcfb076d is described below
commit fcfb076deba2621b296d2ab193c1ba0d16da60bf
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Oct 18 15:31:00 2024 +0300
IGNITE-23388 Use SortedMap for collecting CdcEvents (#291)
---
.../ignite/cdc/AbstractCdcEventsApplier.java | 18 +++++--
.../apache/ignite/cdc/AbstractReplicationTest.java | 57 ++++++++++++++++++++++
.../cdc/CdcIgniteToIgniteReplicationTest.java | 21 +++++---
.../cdc/kafka/CdcKafkaReplicationAppsTest.java | 10 ++--
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 42 +++++++++++-----
5 files changed, 119 insertions(+), 29 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
index 479a1db4..ccf3f804 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
@@ -17,8 +17,9 @@
package org.apache.ignite.cdc;
-import java.util.HashMap;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -36,10 +37,10 @@ public abstract class AbstractCdcEventsApplier<K, V> {
private final int maxBatchSize;
/** Update batch. */
- private final Map<K, V> updBatch = new HashMap<>();
+ private final SortedMap<K, V> updBatch = new TreeMap<>(this::compareKey);
/** Remove batch. */
- private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>();
+ private final SortedMap<K, GridCacheVersion> rmvBatch = new
TreeMap<>(this::compareKey);
/** */
private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
@@ -109,13 +110,12 @@ public abstract class AbstractCdcEventsApplier<K, V> {
* @param applyUpd Apply update batch flag supplier.
* @param applyRmv Apply remove batch flag supplier.
* @return Number of applied events.
- * @throws IgniteCheckedException In case of error.
*/
private int applyIf(
int cacheId,
BooleanSupplier applyUpd,
BooleanSupplier applyRmv
- ) throws IgniteCheckedException {
+ ) {
int evtsApplied = 0;
if (applyUpd.getAsBoolean()) {
@@ -151,6 +151,14 @@ public abstract class AbstractCdcEventsApplier<K, V> {
/** @return Key. */
protected abstract K toKey(CdcEvent evt);
+ /**
+ * Compares keys hash codes only, because bytes might not be available.
+ * If hash codes are equal it put {@code key2} to next batch, see {@link
#isApplyBatch)}.
+ */
+ private int compareKey(Object key1, Object key2) {
+ return Integer.compare(key1.hashCode(), key2.hashCode());
+ }
+
/** @return Value. */
protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion
ver);
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 9eddeea7..9a5e18c7 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -93,6 +94,7 @@ import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_P
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeTrue;
/** */
@RunWith(Parameterized.class)
@@ -310,6 +312,61 @@ public abstract class AbstractReplicationTest extends
GridCommonAbstractTest {
}
}
+ /** Test that CDC instances don't lock each other while streaming mixed
keys. */
+ @Test
+ public void testConcurrentMixedKeys() throws Exception {
+ assumeTrue(atomicity == TRANSACTIONAL);
+
+ for (IgniteEx ign: F.asList(srcCluster[0], destCluster[0])) {
+ ign.createCache(new CacheConfiguration<TestKey, Integer>()
+ .setName(ACTIVE_PASSIVE_CACHE)
+ .setAtomicityMode(atomicity)
+ .setBackups(backups)
+ .setCacheMode(mode));
+ }
+
+ List<IgniteInternalFuture<?>> futs =
startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+ try {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ int cnt = 0;
+
+ // Setup bound for keys to increase probability to stream same
keys through each CDC instance.
+ // The value should not be small to force CDC generates batches
for putAllConflict call.
+ int keysCnt = 20;
+
+ while (cnt++ < 10_000) {
+ srcCluster[rnd.nextInt(2)]
+ .cache(ACTIVE_PASSIVE_CACHE)
+ .put(new TestKey(rnd.nextInt(keysCnt), null),
rnd.nextInt());
+
+ if (cnt % 1_000 == 0)
+ System.out.println("Load count = " + cnt);
+ }
+
+ // Check that all data received.
+ assertTrue(waitForCondition(() -> {
+ IgniteCache<TestKey, Integer> srcCache =
srcCluster[0].cache(ACTIVE_PASSIVE_CACHE);
+ IgniteCache<TestKey, Integer> destCache =
destCluster[0].cache(ACTIVE_PASSIVE_CACHE);
+
+ for (int i = 0; i < keysCnt; i++) {
+ Integer srcVal = srcCache.get(new TestKey(i, null));
+ Integer destVal = destCache.get(new TestKey(i, null));
+
+ if (srcVal == null || !srcVal.equals(destVal))
+ return false;
+ }
+
+ return true;
+
+ }, getTestTimeout()));
+ }
+ finally {
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.cancel();
+ }
+ }
+
/** Replication with complex SQL key. Data inserted via SQL. */
@Test
public void testActivePassiveReplicationComplexKeyWithSQL() throws
Exception {
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 048721a4..b6d42e24 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.spi.systemview.view.SystemView;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT;
@@ -47,7 +48,7 @@ public class CdcIgniteToIgniteReplicationTest extends
AbstractReplicationTest {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgnite(srcCluster[i].configuration(),
destClusterCliCfg[i], destCluster, cache));
+ futs.add(igniteToIgnite(srcCluster[i].configuration(),
destClusterCliCfg[i], destCluster, cache, "ignite-to-ignite-src-" + i));
return futs;
}
@@ -56,11 +57,15 @@ public class CdcIgniteToIgniteReplicationTest extends
AbstractReplicationTest {
@Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
- for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgnite(srcCluster[i].configuration(),
destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE));
+ for (int i = 0; i < srcCluster.length; i++) {
+ futs.add(igniteToIgnite(
+ srcCluster[i].configuration(), destClusterCliCfg[i],
destCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-src-" + i));
+ }
- for (int i = 0; i < destCluster.length; i++)
- futs.add(igniteToIgnite(destCluster[i].configuration(),
srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE));
+ for (int i = 0; i < destCluster.length; i++) {
+ futs.add(igniteToIgnite(
+ destCluster[i].configuration(), srcClusterCliCfg[i],
srcCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-dest-" + i));
+ }
return futs;
}
@@ -81,13 +86,15 @@ public class CdcIgniteToIgniteReplicationTest extends
AbstractReplicationTest {
* @param destCfg Ignite destination cluster configuration.
* @param dest Ignite destination cluster.
* @param cache Cache name to stream to kafka.
+ * @param threadName Thread to run CDC instance.
* @return Future for Change Data Capture application.
*/
protected IgniteInternalFuture<?> igniteToIgnite(
IgniteConfiguration srcCfg,
IgniteConfiguration destCfg,
IgniteEx[] dest,
- String cache
+ String cache,
+ @Nullable String threadName
) {
return runAsync(() -> {
CdcConfiguration cdcCfg = new CdcConfiguration();
@@ -117,7 +124,7 @@ public class CdcIgniteToIgniteReplicationTest extends
AbstractReplicationTest {
cdcs.add(cdc);
cdc.run();
- });
+ }, threadName);
}
/** */
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 96f1d242..927a7b25 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -112,7 +112,8 @@ public class CdcKafkaReplicationAppsTest extends
CdcKafkaReplicationTest {
IgniteConfiguration igniteCfg,
String topic,
String metadataTopic,
- String cache
+ String cache,
+ String threadName
) {
Map<String, String> params = new HashMap<>();
@@ -127,7 +128,7 @@ public class CdcKafkaReplicationAppsTest extends
CdcKafkaReplicationTest {
params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
return runAsync(
- () -> CdcCommandLineStartup.main(new String[]
{prepareConfig("/replication/ignite-to-kafka.xml", params)})
+ () -> CdcCommandLineStartup.main(new String[]
{prepareConfig("/replication/ignite-to-kafka.xml", params)}), threadName
);
}
@@ -139,7 +140,8 @@ public class CdcKafkaReplicationAppsTest extends
CdcKafkaReplicationTest {
IgniteConfiguration igniteCfg,
IgniteEx[] dest,
int partFrom,
- int partTo
+ int partTo,
+ String threadName
) {
Map<String, String> params = new HashMap<>();
@@ -174,7 +176,7 @@ public class CdcKafkaReplicationAppsTest extends
CdcKafkaReplicationTest {
params.put(METRIC_REG_NAME, DFLT_METRICS_REG_NAME + "-" +
igniteCfg.getIgniteInstanceName());
return runAsync(
- () -> KafkaToIgniteCommandLineStartup.main(new String[]
{prepareConfig(cfg, params)})
+ () -> KafkaToIgniteCommandLineStartup.main(new String[]
{prepareConfig(cfg, params)}), threadName
);
}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 05c5154f..a56b2941 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -104,8 +104,11 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
- for (IgniteEx ex : srcCluster)
- futs.add(igniteToKafka(ex.configuration(), cache,
SRC_DEST_META_TOPIC, cache));
+ for (IgniteEx ex : srcCluster) {
+ int idx = getTestIgniteInstanceIndex(ex.name());
+
+ futs.add(igniteToKafka(ex.configuration(), cache,
SRC_DEST_META_TOPIC, cache, "ignite-src-to-kafka-" + idx));
+ }
for (int i = 0; i < destCluster.length; i++) {
futs.add(kafkaToIgnite(
@@ -115,7 +118,8 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
destClusterCliCfg[i],
destCluster,
i * (DFLT_PARTS / 2),
- (i + 1) * (DFLT_PARTS / 2)
+ (i + 1) * (DFLT_PARTS / 2),
+ "kafka-to-ignite-dest-" + i
));
}
@@ -126,11 +130,19 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
@Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
- for (IgniteEx ex : srcCluster)
- futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC,
SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE));
+ for (IgniteEx ex : srcCluster) {
+ int idx = getTestIgniteInstanceIndex(ex.name());
+
+ futs.add(igniteToKafka(
+ ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC,
ACTIVE_ACTIVE_CACHE, "ignite-src-to-kafka-" + idx));
+ }
- for (IgniteEx ex : destCluster)
- futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC,
DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE));
+ for (IgniteEx ex : destCluster) {
+ int idx = getTestIgniteInstanceIndex(ex.name());
+
+ futs.add(igniteToKafka(
+ ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC,
ACTIVE_ACTIVE_CACHE, "ignite-dest-to-kafka-" + idx));
+ }
futs.add(kafkaToIgnite(
ACTIVE_ACTIVE_CACHE,
@@ -139,7 +151,8 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
destClusterCliCfg[0],
destCluster,
0,
- DFLT_PARTS
+ DFLT_PARTS,
+ "kafka-to-ignite-src"
));
futs.add(kafkaToIgnite(
@@ -149,7 +162,8 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
srcClusterCliCfg[0],
srcCluster,
0,
- DFLT_PARTS
+ DFLT_PARTS,
+ "kafka-to-ignite-dest"
));
return futs;
@@ -247,7 +261,8 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
IgniteConfiguration igniteCfg,
String topic,
String metadataTopic,
- String cache
+ String cache,
+ String threadName
) {
return runAsync(() -> {
IgniteToKafkaCdcStreamer cdcCnsmr = new IgniteToKafkaCdcStreamer()
@@ -270,7 +285,7 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
cdcs.add(cdc);
cdc.run();
- });
+ }, threadName);
}
/**
@@ -286,7 +301,8 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
IgniteConfiguration igniteCfg,
IgniteEx[] dest,
int fromPart,
- int toPart
+ int toPart,
+ String threadName
) {
KafkaToIgniteCdcStreamerConfiguration cfg = new
KafkaToIgniteCdcStreamerConfiguration();
@@ -315,7 +331,7 @@ public class CdcKafkaReplicationTest extends
AbstractReplicationTest {
kafkaStreamers.add(streamer);
- return runAsync(streamer);
+ return runAsync(streamer, threadName);
}
/** */