This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 67dd34efe32 IGNITE-22299 Drop GridCacheVersion#otherClusterVersion for
cache resending (#11360)
67dd34efe32 is described below
commit 67dd34efe329ed11cb252f0f9d3a5598287b818d
Author: Maksim Timonin <[email protected]>
AuthorDate: Mon May 27 22:07:50 2024 +0300
IGNITE-22299 Drop GridCacheVersion#otherClusterVersion for cache resending
(#11360)
---
.../apache/ignite/util/CdcResendCommandTest.java | 182 +++++++++++++++++++++
.../management/cdc/CdcCacheDataResendTask.java | 10 +-
2 files changed, 191 insertions(+), 1 deletion(-)
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
index 81cb9864e6c..c333fd3ed0d 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
@@ -17,12 +17,41 @@
package org.apache.ignite.util;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcMain;
+import
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
@@ -53,9 +82,27 @@ public class CdcResendCommandTest extends
GridCommandHandlerAbstractTest {
cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setBackups(1));
+ cfg.setPluginProviders(new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "ConflictResolverProvider";
+ }
+
+ @Override public <T> T createComponent(PluginContext ctx, Class<T>
cls) {
+ if (cls != CacheConflictResolutionManager.class)
+ return null;
+
+ return (T)new AlwaysNewResolutionManager<>();
+ }
+ });
+
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -94,4 +141,139 @@ public class CdcResendCommandTest extends
GridCommandHandlerAbstractTest {
assertEquals(KEYS_CNT, ign.cache(DEFAULT_CACHE_NAME).size());
}
+
+ /** */
+ @Test
+ public void testResendConflictVersion() throws Exception {
+ IgniteEx ign = startGrid(0);
+
+ ign.cluster().state(ACTIVE);
+
+ enableCheckpoints(ign, false);
+
+ IgniteInternalCache<Integer, Integer> cachex =
ign.cachex(DEFAULT_CACHE_NAME);
+
+ // Put data.
+ cachex.put(0, 0);
+
+ // Override data from clusterId=2.
+ KeyCacheObject key = new KeyCacheObjectImpl(0, null,
cachex.affinity().partition(0));
+ CacheObject val = new CacheObjectImpl(1, null);
+ val.prepareMarshal(cachex.context().cacheObjectContext());
+
+ GridCacheVersion conflict = new GridCacheVersion(1, 0, 1, (byte)2);
+
+ Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
+ drMap.put(key, new GridCacheDrInfo(val, conflict));
+
+ cachex.putAllConflict(drMap);
+
+ // Resend data.
+ executeCommand(EXIT_CODE_OK, CDC, RESEND, CACHES, DEFAULT_CACHE_NAME);
+
+ TestCdcConsumer cnsmr = new TestCdcConsumer();
+
+ CdcConfiguration cfg = new CdcConfiguration();
+ cfg.setConsumer(cnsmr);
+
+ CdcMain cdc = new CdcMain(ign.configuration(), null, cfg);
+ GridTestUtils.runAsync(cdc);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> cnsmr.events().size()
== 3, 10_000, 100));
+
+ CdcEvent ev0 = cnsmr.events().get(0);
+ assertEquals(0, ev0.key());
+ assertEquals(0, ev0.value());
+ assertNull(ev0.version().otherClusterVersion());
+
+ CdcEvent ev1 = cnsmr.events().get(1);
+ assertEquals(0, ev1.key());
+ assertEquals(1, ev1.value());
+ assertEquals(conflict, ev1.version().otherClusterVersion());
+
+ CdcEvent ev2 = cnsmr.events().get(2);
+ assertEquals(0, ev2.key());
+ assertEquals(1, ev2.value());
+ assertNull(ev2.version().otherClusterVersion());
+ }
+
+ /** */
+ private static class TestCdcConsumer implements CdcConsumer {
+ /** */
+ private final List<CdcEvent> events = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> events) {
+ synchronized (this) {
+ events.forEachRemaining(this.events::add);
+ }
+
+ return false;
+ }
+
+ /** */
+ synchronized List<CdcEvent> events() {
+ return events;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(t -> {});
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(t -> {});
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent>
cacheEvents) {
+ cacheEvents.forEachRemaining(t -> {});
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op
+ }
+ }
+
+ /** */
+ private static class AlwaysNewResolutionManager<K, V>
+ extends GridCacheManagerAdapter<K, V> implements
CacheConflictResolutionManager<K, V> {
+ /** */
+ private final CacheVersionConflictResolver rslv;
+
+ /** */
+ AlwaysNewResolutionManager() {
+ rslv = new CacheVersionConflictResolver() {
+ @Override public <K1, V1> GridCacheVersionConflictContext<K1,
V1> resolve(
+ CacheObjectValueContext ctx,
+ GridCacheVersionedEntryEx<K1, V1> oldEntry,
+ GridCacheVersionedEntryEx<K1, V1> newEntry,
+ boolean atomicVerComparator
+ ) {
+ GridCacheVersionConflictContext<K1, V1> res = new
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+ res.useNew();
+
+ return res;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheVersionConflictResolver conflictResolver() {
+ return rslv;
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
index 9e206493bc1..c38a94df8cd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
@@ -39,6 +39,8 @@ import
org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
@@ -195,13 +197,19 @@ public class CdcCacheDataResendTask extends
VisorMultiNodeTask<CdcResendCommandA
if (log.isTraceEnabled())
log.trace("Resend key: " + key);
+ GridCacheVersion ver = row.version();
+
+ // Entries must not hold otherClusterVersion to be inserted
into a receiver cluster.
+ if (ver instanceof GridCacheVersionEx)
+ ver = new GridCacheVersion(ver.topologyVersion(),
ver.order(), ver.nodeOrder(), ver.clusterId());
+
CdcDataRecord rec = new CdcDataRecord(new DataEntry(
cctx.cacheId(),
key,
row.value(),
GridCacheOperation.CREATE,
null,
- row.version(),
+ ver,
row.expireTime(),
key.partition(),
-1,