This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new f6ee5b31 IGNITE-17449 TTL support for CDC (#206)
f6ee5b31 is described below

commit f6ee5b316fa1307dea2e8c8f96d1072bae875b22
Author: Nikolay <[email protected]>
AuthorDate: Wed Mar 15 19:38:42 2023 +0300

    IGNITE-17449 TTL support for CDC (#206)
---
 .../ignite/cdc/AbstractCdcEventsApplier.java       |  4 ++--
 .../apache/ignite/cdc/CdcEventsIgniteApplier.java  | 12 +++++-----
 .../CacheVersionConflictResolverImpl.java          | 26 +++++++++++++++++++++-
 .../DebugCacheVersionConflictResolverImpl.java     |  2 ++
 .../cdc/thin/CdcEventsIgniteClientApplier.java     | 10 ++++-----
 .../apache/ignite/cdc/AbstractReplicationTest.java | 10 +++++++--
 6 files changed, 49 insertions(+), 15 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
index ffef9111..479a1db4 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
@@ -87,7 +87,7 @@ public abstract class AbstractCdcEventsApplier<K, V> {
             if (evt.value() != null) {
                 evtsApplied += applyIf(currCacheId, () -> 
isApplyBatch(updBatch, key), hasRemoves);
 
-                updBatch.put(key, toValue(currCacheId, evt.value(), ver));
+                updBatch.put(key, toValue(currCacheId, evt, ver));
             }
             else {
                 evtsApplied += applyIf(currCacheId, hasUpdates, () -> 
isApplyBatch(rmvBatch, key));
@@ -152,7 +152,7 @@ public abstract class AbstractCdcEventsApplier<K, V> {
     protected abstract K toKey(CdcEvent evt);
 
     /** @return Value. */
-    protected abstract V toValue(int cacheId, Object val, GridCacheVersion 
ver);
+    protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion 
ver);
 
     /** Stores DR data. */
     protected abstract void putAllConflict(int cacheId, Map<K, V> drMap);
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
index bce50d38..9afda275 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.util.collection.IntMap;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_ETERNAL;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
 
 /**
  * Contains logic to process {@link CdcEvent} and apply them to the 
destination cluster.
@@ -93,16 +93,18 @@ public class CdcEventsIgniteApplier extends 
AbstractCdcEventsApplier<KeyCacheObj
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheDrInfo toValue(int cacheId, Object val, 
GridCacheVersion ver) {
+    @Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, 
GridCacheVersion ver) {
         CacheObject cacheObj;
 
+        Object val = evt.value();
+
         if (val instanceof CacheObject)
             cacheObj = (CacheObject)val;
         else
             cacheObj = new CacheObjectImpl(val, null);
 
-        return cache(cacheId).configuration().getExpiryPolicyFactory() != null 
?
-            new GridCacheDrExpirationInfo(cacheObj, ver, TTL_NOT_CHANGED, 
EXPIRE_TIME_CALCULATE) :
+        return evt.expireTime() != EXPIRE_TIME_ETERNAL ?
+            new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL, 
evt.expireTime()) :
             new GridCacheDrInfo(cacheObj, ver);
     }
 
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
index 56e764fd..ce1a17fc 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -24,6 +24,7 @@ import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictR
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -88,7 +89,30 @@ public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictRes
     ) {
         GridCacheVersionConflictContext<K, V> res = new 
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
 
-        if (isUseNew(ctx, oldEntry, newEntry))
+        boolean expireExists = oldEntry.ttl() != CU.TTL_ETERNAL
+            || newEntry.ttl() != CU.TTL_ETERNAL
+            || oldEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL
+            || newEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL;
+
+        boolean useNew = isUseNew(ctx, oldEntry, newEntry);
+
+        if (expireExists) {
+            if (newEntry.expireTime() > oldEntry.expireTime()) {
+                res.merge(
+                    useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
+                    newEntry.ttl(),
+                    newEntry.expireTime()
+                );
+            }
+            else {
+                res.merge(
+                    useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
+                    oldEntry.ttl(),
+                    oldEntry.expireTime()
+                );
+            }
+        }
+        else if (useNew)
             res.useNew();
         else
             res.useOld();
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
index cb3ce321..000cbf37 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
@@ -54,6 +54,8 @@ public class DebugCacheVersionConflictResolverImpl extends 
CacheVersionConflictR
             "start=" + oldEntry.isStartVersion() +
             ", oldVer=" + oldEntry.version() +
             ", newVer=" + newEntry.version() +
+            ", oldExpire=[" + oldEntry.ttl() + "," + oldEntry.expireTime() + 
']' +
+            ", newExpire=[" + newEntry.ttl() + "," + newEntry.expireTime() + 
']' +
             ", old=" + oldVal +
             ", new=" + newVal +
             ", res=" + res + ']');
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
index 5484277a..2a991213 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.client.thin.TcpClientCache;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.collection.IntHashMap;
 import org.apache.ignite.internal.util.collection.IntMap;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /**
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
  * @see TcpClientCache#putAllConflict(Map)
  * @see TcpClientCache#removeAllConflict(Map)
  */
-public class CdcEventsIgniteClientApplier extends 
AbstractCdcEventsApplier<Object, T2<Object, GridCacheVersion>> {
+public class CdcEventsIgniteClientApplier extends 
AbstractCdcEventsApplier<Object, T3<Object, GridCacheVersion, Long>> {
     /** Client connected to the destination cluster. */
     private final IgniteClient client;
 
@@ -59,12 +59,12 @@ public class CdcEventsIgniteClientApplier extends 
AbstractCdcEventsApplier<Objec
     }
 
     /** {@inheritDoc} */
-    @Override protected T2<Object, GridCacheVersion> toValue(int cacheId, 
Object val, GridCacheVersion ver) {
-        return new T2<>(val, ver);
+    @Override protected T3<Object, GridCacheVersion, Long> toValue(int 
cacheId, CdcEvent evt, GridCacheVersion ver) {
+        return new T3<>(evt.value(), ver, evt.expireTime());
     }
 
     /** {@inheritDoc} */
-    @Override protected void putAllConflict(int cacheId, Map<Object, 
T2<Object, GridCacheVersion>> drMap) {
+    @Override protected void putAllConflict(int cacheId, Map<Object, 
T3<Object, GridCacheVersion, Long>> drMap) {
         cache(cacheId).putAllConflict(drMap);
     }
 
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 144b8b87..02d394d8 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -446,7 +446,7 @@ public abstract class AbstractReplicationTest extends 
GridCommonAbstractTest {
     /** Test that destination cluster applies expiration policy on received 
entries. */
     @Test
     public void testWithExpiryPolicy() throws Exception {
-        Factory<? extends ExpiryPolicy> factory = () -> new 
CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 10));
+        Factory<? extends ExpiryPolicy> factory = () -> new 
CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 30));
 
         IgniteCache<Integer, ConflictResolvableTestData> srcCache = 
createCache(srcCluster[0], ACTIVE_PASSIVE_CACHE, factory);
         IgniteCache<Integer, ConflictResolvableTestData> destCache = 
createCache(destCluster[0], ACTIVE_PASSIVE_CACHE, factory);
@@ -465,7 +465,13 @@ public abstract class AbstractReplicationTest extends 
GridCommonAbstractTest {
             assertTrue(waitForCondition(() -> !srcCache.containsKey(0), 
getTestTimeout()));
 
             log.warning(">>>>>> Waiting for removing in destination cache");
-            assertTrue(waitForCondition(() -> !destCache.containsKey(0), 
20_000));
+
+            Duration ttl = factory.create().getExpiryForCreation();
+
+            assertTrue(waitForCondition(
+                () -> !destCache.containsKey(0),
+                2 * TimeUnit.MILLISECONDS.convert(ttl.getDurationAmount(), 
ttl.getTimeUnit())
+            ));
         }
         finally {
             for (IgniteInternalFuture<?> fut : futs)

Reply via email to