This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fda39087a8c IGNITE-19020 Add ability to provide expireTime during
conflict resolution (#10594)
fda39087a8c is described below
commit fda39087a8c3bf23b25a7ef2117ac2e2296a3f15
Author: Nikolay <[email protected]>
AuthorDate: Wed Mar 15 19:24:45 2023 +0300
IGNITE-19020 Add ability to provide expireTime during conflict resolution
(#10594)
---
.../version/GridCacheVersionConflictContext.java | 12 +-
.../IgniteCacheExpiryPolicyAbstractTest.java | 163 +++++++++++++++++++++
2 files changed, 171 insertions(+), 4 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
index c7fb74aabea..42ea4bff9d2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.version;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -49,6 +48,9 @@ public class GridCacheVersionConflictContext<K, V> {
/** TTL. */
private long ttl;
+ /** Expire time. */
+ private long expireTime;
+
/** Manual resolve flag. */
private boolean manualResolve;
@@ -118,12 +120,13 @@ public class GridCacheVersionConflictContext<K, V> {
* Force cache to use neither old, nor new, but some other value passed as
argument. In this case old
* value will be replaced with merge value and update will be considered
as local.
* <p>
- * Also in case of merge you have to specify new TTL explicitly. For
unlimited TTL use {@code 0}.
+ * Also in case of merge you have to specify new TTL and expire time
explicitly. For unlimited TTL use {@code 0}.
*
* @param mergeVal Merge value or {@code null} to force remove.
* @param ttl Time to live in milliseconds (must be non-negative).
+ * @param expireTime Expire time.
*/
- public void merge(@Nullable V mergeVal, long ttl) {
+ public void merge(@Nullable V mergeVal, long ttl, long expireTime) {
if (ttl < 0)
throw new IllegalArgumentException("TTL must be non-negative: " +
ttl);
@@ -131,6 +134,7 @@ public class GridCacheVersionConflictContext<K, V> {
this.mergeVal = mergeVal;
this.ttl = ttl;
+ this.expireTime = expireTime;
}
/**
@@ -186,7 +190,7 @@ public class GridCacheVersionConflictContext<K, V> {
* @return Expire time.
*/
public long expireTime() {
- return isUseNew() ? newEntry.expireTime() : isUseOld() ?
oldEntry.expireTime() : CU.toExpireTime(ttl);
+ return isUseNew() ? newEntry.expireTime() : isUseOld() ?
oldEntry.expireTime() : expireTime;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index a9cf713fbbc..05d3d894720 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -38,21 +38,41 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
+import
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import
org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridPlainClosure2;
+import org.apache.ignite.internal.util.lang.GridPlainInClosure;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.AbstractCachePluginProvider;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
@@ -88,6 +108,38 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest
extends IgniteCacheAbs
/** */
private Integer lastKey = 0;
+ /** */
+ private boolean conflictResolverPlugin;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (conflictResolverPlugin) {
+ cfg.setPluginProviders(new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "ConflictResolverProvider";
+ }
+
+ @Override public CachePluginProvider
createCacheProvider(CachePluginContext ctx) {
+ if
(!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME))
+ return null;
+
+ return new AbstractCachePluginProvider() {
+ @Override public Object createComponent(Class cls) {
+ if (cls != CacheConflictResolutionManager.class)
+ return null;
+
+ return new TestCacheConflictResolutionManager();
+ }
+ };
+ }
+ });
+ }
+
+ return cfg;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
// No-op.
@@ -209,6 +261,89 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest
extends IgniteCacheAbs
checkNoValue(F.asList(key));
}
+ /** @throws Exception If failed. */
+ @Test
+ public void testPutAllConflictWithResolverPluginTtlUpdateOrder() throws
Exception {
+ conflictResolverPlugin = true;
+
+ try {
+ startGrids();
+
+ final IgniteInternalCache<Integer, Object> cache =
grid(0).cachex(DEFAULT_CACHE_NAME);
+
+ GridCacheVersion ver = new GridCacheVersion(1, 1, 1, 2);
+
+ CacheObjectImpl val = new CacheObjectImpl(1, null);
+
+ doTestTtlUpdateOrder(
+ key -> cache.putAllConflict(F.asMap(
+ new KeyCacheObjectImpl(key, null, -1),
+ new GridCacheDrInfo(val, ver)
+ )),
+ (key, ttl) -> {
+ cache.putAllConflict(F.asMap(
+ new KeyCacheObjectImpl(key, null, -1),
+ new GridCacheDrExpirationInfo(val, ver, ttl,
CU.toExpireTime(ttl))
+ ));
+
+ return null;
+ }
+ );
+ }
+ finally {
+ conflictResolverPlugin = false;
+ }
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testTtlUpdateOrder() throws Exception {
+ startGrids();
+
+ final IgniteCache<Integer, Object> cache = jcache(0);
+
+ doTestTtlUpdateOrder(
+ key -> cache.put(key, 1),
+ (key, ttl) -> {
+ cache.withExpiryPolicy(new TestPolicy(ttl, ttl, ttl)).put(key,
1);
+
+ return null;
+ }
+ );
+ }
+
+ /** */
+ private void doTestTtlUpdateOrder(
+ GridPlainInClosure<Integer> withoutTtl,
+ GridPlainClosure2<Integer, Long, Void> withTtl
+ ) throws IgniteCheckedException {
+ final IgniteCache<Integer, Object> cache = jcache(0);
+
+ Integer key = primaryKey(cache);
+
+ long ttl = 2_000;
+
+ withoutTtl.apply(key);
+
+ assertTrue(cache.containsKey(key));
+
+ withTtl.apply(key, ttl);
+
+ assertTrue(cache.containsKey(key));
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
!cache.containsKey(key), 3 * ttl));
+
+ withTtl.apply(key, ttl);
+
+ assertTrue(cache.containsKey(key));
+
+ withoutTtl.apply(key);
+
+ assertTrue(cache.containsKey(key));
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
!cache.containsKey(key), 3 * ttl));
+ }
+
/**
* @throws Exception If failed.
*/
@@ -1408,4 +1543,32 @@ public abstract class
IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
return S.toString(TestPolicy.class, this);
}
}
+
+ /** */
+ public static class TestCacheConflictResolutionManager<K, V> extends
GridCacheManagerAdapter<K, V>
+ implements CacheConflictResolutionManager<K, V> {
+
+ /** {@inheritDoc} */
+ @Override public CacheVersionConflictResolver conflictResolver() {
+ return new CacheVersionConflictResolver() {
+ @Override public <K, V> GridCacheVersionConflictContext<K, V>
resolve(
+ CacheObjectValueContext ctx,
+ GridCacheVersionedEntryEx<K, V> oldEntry,
+ GridCacheVersionedEntryEx<K, V> newEntry,
+ boolean atomicVerComparator
+ ) {
+ GridCacheVersionConflictContext<K, V> res =
+ new GridCacheVersionConflictContext<>(ctx, oldEntry,
newEntry);
+
+ res.merge(
+ newEntry.value(ctx),
+ Math.max(oldEntry.ttl(), newEntry.ttl()),
+ Math.max(oldEntry.expireTime(), newEntry.expireTime())
+ );
+
+ return res;
+ }
+ };
+ }
+ }
}