This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new f6ee5b31 IGNITE-17449 TTL support for CDC (#206)
f6ee5b31 is described below
commit f6ee5b316fa1307dea2e8c8f96d1072bae875b22
Author: Nikolay <[email protected]>
AuthorDate: Wed Mar 15 19:38:42 2023 +0300
IGNITE-17449 TTL support for CDC (#206)
---
.../ignite/cdc/AbstractCdcEventsApplier.java | 4 ++--
.../apache/ignite/cdc/CdcEventsIgniteApplier.java | 12 +++++-----
.../CacheVersionConflictResolverImpl.java | 26 +++++++++++++++++++++-
.../DebugCacheVersionConflictResolverImpl.java | 2 ++
.../cdc/thin/CdcEventsIgniteClientApplier.java | 10 ++++-----
.../apache/ignite/cdc/AbstractReplicationTest.java | 10 +++++++--
6 files changed, 49 insertions(+), 15 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
index ffef9111..479a1db4 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
@@ -87,7 +87,7 @@ public abstract class AbstractCdcEventsApplier<K, V> {
if (evt.value() != null) {
evtsApplied += applyIf(currCacheId, () ->
isApplyBatch(updBatch, key), hasRemoves);
- updBatch.put(key, toValue(currCacheId, evt.value(), ver));
+ updBatch.put(key, toValue(currCacheId, evt, ver));
}
else {
evtsApplied += applyIf(currCacheId, hasUpdates, () ->
isApplyBatch(rmvBatch, key));
@@ -152,7 +152,7 @@ public abstract class AbstractCdcEventsApplier<K, V> {
protected abstract K toKey(CdcEvent evt);
/** @return Value. */
- protected abstract V toValue(int cacheId, Object val, GridCacheVersion
ver);
+ protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion
ver);
/** Stores DR data. */
protected abstract void putAllConflict(int cacheId, Map<K, V> drMap);
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
index bce50d38..9afda275 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
-import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_ETERNAL;
+import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
/**
* Contains logic to process {@link CdcEvent} and apply them to the
destination cluster.
@@ -93,16 +93,18 @@ public class CdcEventsIgniteApplier extends
AbstractCdcEventsApplier<KeyCacheObj
}
/** {@inheritDoc} */
- @Override protected GridCacheDrInfo toValue(int cacheId, Object val,
GridCacheVersion ver) {
+ @Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt,
GridCacheVersion ver) {
CacheObject cacheObj;
+ Object val = evt.value();
+
if (val instanceof CacheObject)
cacheObj = (CacheObject)val;
else
cacheObj = new CacheObjectImpl(val, null);
- return cache(cacheId).configuration().getExpiryPolicyFactory() != null
?
- new GridCacheDrExpirationInfo(cacheObj, ver, TTL_NOT_CHANGED,
EXPIRE_TIME_CALCULATE) :
+ return evt.expireTime() != EXPIRE_TIME_ETERNAL ?
+ new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL,
evt.expireTime()) :
new GridCacheDrInfo(cacheObj, ver);
}
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
index 56e764fd..ce1a17fc 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictR
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -88,7 +89,30 @@ public class CacheVersionConflictResolverImpl implements
CacheVersionConflictRes
) {
GridCacheVersionConflictContext<K, V> res = new
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
- if (isUseNew(ctx, oldEntry, newEntry))
+ boolean expireExists = oldEntry.ttl() != CU.TTL_ETERNAL
+ || newEntry.ttl() != CU.TTL_ETERNAL
+ || oldEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL
+ || newEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL;
+
+ boolean useNew = isUseNew(ctx, oldEntry, newEntry);
+
+ if (expireExists) {
+ if (newEntry.expireTime() > oldEntry.expireTime()) {
+ res.merge(
+ useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
+ newEntry.ttl(),
+ newEntry.expireTime()
+ );
+ }
+ else {
+ res.merge(
+ useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
+ oldEntry.ttl(),
+ oldEntry.expireTime()
+ );
+ }
+ }
+ else if (useNew)
res.useNew();
else
res.useOld();
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
index cb3ce321..000cbf37 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/DebugCacheVersionConflictResolverImpl.java
@@ -54,6 +54,8 @@ public class DebugCacheVersionConflictResolverImpl extends
CacheVersionConflictR
"start=" + oldEntry.isStartVersion() +
", oldVer=" + oldEntry.version() +
", newVer=" + newEntry.version() +
+ ", oldExpire=[" + oldEntry.ttl() + "," + oldEntry.expireTime() +
']' +
+ ", newExpire=[" + newEntry.ttl() + "," + newEntry.expireTime() +
']' +
", old=" + oldVal +
", new=" + newVal +
", res=" + res + ']');
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
index 5484277a..2a991213 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.client.thin.TcpClientCache;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.collection.IntHashMap;
import org.apache.ignite.internal.util.collection.IntMap;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
/**
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
* @see TcpClientCache#putAllConflict(Map)
* @see TcpClientCache#removeAllConflict(Map)
*/
-public class CdcEventsIgniteClientApplier extends
AbstractCdcEventsApplier<Object, T2<Object, GridCacheVersion>> {
+public class CdcEventsIgniteClientApplier extends
AbstractCdcEventsApplier<Object, T3<Object, GridCacheVersion, Long>> {
/** Client connected to the destination cluster. */
private final IgniteClient client;
@@ -59,12 +59,12 @@ public class CdcEventsIgniteClientApplier extends
AbstractCdcEventsApplier<Objec
}
/** {@inheritDoc} */
- @Override protected T2<Object, GridCacheVersion> toValue(int cacheId,
Object val, GridCacheVersion ver) {
- return new T2<>(val, ver);
+ @Override protected T3<Object, GridCacheVersion, Long> toValue(int
cacheId, CdcEvent evt, GridCacheVersion ver) {
+ return new T3<>(evt.value(), ver, evt.expireTime());
}
/** {@inheritDoc} */
- @Override protected void putAllConflict(int cacheId, Map<Object,
T2<Object, GridCacheVersion>> drMap) {
+ @Override protected void putAllConflict(int cacheId, Map<Object,
T3<Object, GridCacheVersion, Long>> drMap) {
cache(cacheId).putAllConflict(drMap);
}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 144b8b87..02d394d8 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -446,7 +446,7 @@ public abstract class AbstractReplicationTest extends
GridCommonAbstractTest {
/** Test that destination cluster applies expiration policy on received
entries. */
@Test
public void testWithExpiryPolicy() throws Exception {
- Factory<? extends ExpiryPolicy> factory = () -> new
CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 10));
+ Factory<? extends ExpiryPolicy> factory = () -> new
CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 30));
IgniteCache<Integer, ConflictResolvableTestData> srcCache =
createCache(srcCluster[0], ACTIVE_PASSIVE_CACHE, factory);
IgniteCache<Integer, ConflictResolvableTestData> destCache =
createCache(destCluster[0], ACTIVE_PASSIVE_CACHE, factory);
@@ -465,7 +465,13 @@ public abstract class AbstractReplicationTest extends
GridCommonAbstractTest {
assertTrue(waitForCondition(() -> !srcCache.containsKey(0),
getTestTimeout()));
log.warning(">>>>>> Waiting for removing in destination cache");
- assertTrue(waitForCondition(() -> !destCache.containsKey(0),
20_000));
+
+ Duration ttl = factory.create().getExpiryForCreation();
+
+ assertTrue(waitForCondition(
+ () -> !destCache.containsKey(0),
+ 2 * TimeUnit.MILLISECONDS.convert(ttl.getDurationAmount(),
ttl.getTimeUnit())
+ ));
}
finally {
for (IgniteInternalFuture<?> fut : futs)