This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 721ea0b8502 IGNITE-17449 Support expire time in CDC (#10583)
721ea0b8502 is described below

commit 721ea0b85023523c295ba68c1accee7e18fe6522
Author: Nikolay <[email protected]>
AuthorDate: Mon Mar 13 17:02:13 2023 +0300

    IGNITE-17449 Support expire time in CDC (#10583)
---
 .../clients/AbstractClientCompatibilityTest.java   |  3 +
 .../clients/JavaThinCompatibilityTest.java         | 10 +--
 .../main/java/org/apache/ignite/cdc/CdcEvent.java  | 10 +++
 .../apache/ignite/internal/cdc/CdcEventImpl.java   | 21 +++++-
 .../ignite/internal/cdc/WalRecordsConsumer.java    |  3 +-
 .../internal/client/thin/TcpClientCache.java       |  9 +--
 .../cache/ClientCachePutAllConflictRequest.java    | 10 ++-
 .../java/org/apache/ignite/cdc/CdcSelfTest.java    | 81 ++++++++++++++++++++++
 .../client/thin/DataReplicationOperationsTest.java | 70 +++++++++++++++----
 9 files changed, 188 insertions(+), 29 deletions(-)

diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index 4d04a10322c..850726a7f41 100644
--- 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -75,6 +75,9 @@ public abstract class AbstractClientCompatibilityTest extends 
IgniteCompatibilit
     /** Version 2.14.0. */
     protected static final IgniteProductVersion VER_2_14_0 = 
IgniteProductVersion.fromString("2.14.0");
 
+    /** Version 2.15.0. */
+    protected static final IgniteProductVersion VER_2_15_0 = 
IgniteProductVersion.fromString("2.15.0");
+
     /** Parameters. */
     @Parameterized.Parameters(name = "Version {0}")
     public static Iterable<Object[]> versions() {
diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index cee1d214ef3..ad666ced379 100644
--- 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -63,8 +63,9 @@ import org.apache.ignite.internal.client.thin.TcpClientCache;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.platform.PlatformType;
 import org.apache.ignite.resources.ServiceContextResource;
@@ -448,8 +449,8 @@ public class JavaThinCompatibilityTest extends 
AbstractClientCompatibilityTest {
             }
         }
 
-        if (clientVer.compareTo(VER_2_14_0) >= 0)
-            testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >= 
0);
+        if (clientVer.compareTo(VER_2_15_0) >= 0)
+            testDataReplicationOperations(serverVer.compareTo(VER_2_15_0) >= 
0);
 
         if (clientVer.compareTo(VER_2_14_0) >= 0)
             new JavaThinIndexQueryCompatibilityTest().testIndexQueries(ADDR, 
serverVer.compareTo(VER_2_14_0) >= 0);
@@ -515,7 +516,8 @@ public class JavaThinCompatibilityTest extends 
AbstractClientCompatibilityTest {
             TcpClientCache<Object, Object> cache = (TcpClientCache<Object, 
Object>)client
                 .getOrCreateCache("test-cache-replication");
 
-            Map<Object, T2<Object, GridCacheVersion>> puts = F.asMap(1, new 
T2<>(1, new GridCacheVersion(1, 1, 1, 2)));
+            Map<Object, T3<Object, GridCacheVersion, Long>> puts =
+                F.asMap(1, new T3<>(1, new GridCacheVersion(1, 1, 1, 2), 
U.currentTimeMillis() + 1000));
 
             Map<Object, GridCacheVersion> rmvs = F.asMap(1, new 
GridCacheVersion(1, 1, 1, 2));
 
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java 
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
index 2f8f7a5a665..5f1a490fa6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.cdc;
 
 import java.io.Serializable;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.internal.cdc.CdcMain;
@@ -76,4 +78,12 @@ public interface CdcEvent extends Serializable {
      * @see CacheView#cacheId()
      */
     public int cacheId();
+
+    /**
+     * @return Time when entry will be removed from cache. If {@code 0} then 
entry will be cached until removed.
+     * @see org.apache.ignite.IgniteCache#withExpiryPolicy(ExpiryPolicy)
+     * @see 
org.apache.ignite.configuration.CacheConfiguration#setExpiryPolicyFactory(Factory)
+     * @see 
org.apache.ignite.internal.processors.cache.GridCacheUtils#EXPIRE_TIME_ETERNAL
+     */
+    public long expireTime();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
index a12aa0e388a..96f6adfd079 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
@@ -51,6 +51,9 @@ public class CdcEventImpl implements CdcEvent {
     /** Cache id. */
     private final int cacheId;
 
+    /** Expire time. */
+    private final long expireTime;
+
     /**
      * @param key Key.
      * @param val Value.
@@ -58,15 +61,24 @@ public class CdcEventImpl implements CdcEvent {
      * @param part Partition.
      * @param ord Order of the entry change.
      * @param cacheId Cache id.
+     * @param expireTime Expire time.
      */
-    public CdcEventImpl(Object key, Object val, boolean primary, int part,
-        CacheEntryVersion ord, int cacheId) {
+    public CdcEventImpl(
+        Object key,
+        Object val,
+        boolean primary,
+        int part,
+        CacheEntryVersion ord,
+        int cacheId,
+        long expireTime
+    ) {
         this.key = key;
         this.val = val;
         this.primary = primary;
         this.part = part;
         this.ord = ord;
         this.cacheId = cacheId;
+        this.expireTime = expireTime;
     }
 
     /** {@inheritDoc} */
@@ -99,6 +111,11 @@ public class CdcEventImpl implements CdcEvent {
         return cacheId;
     }
 
+    /** {@inheritDoc} */
+    @Override public long expireTime() {
+        return expireTime;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CdcEventImpl.class, this);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 3d450d6cd61..8a004031644 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -99,7 +99,8 @@ public class WalRecordsConsumer<K, V> {
             (e.flags() & DataEntry.PRIMARY_FLAG) != 0,
             e.partitionId(),
             e.writeVersion(),
-            e.cacheId()
+            e.cacheId(),
+            e.expireTime()
         );
     };
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 62dd9f1d83e..5edf586652b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -57,7 +57,7 @@ import 
org.apache.ignite.internal.cache.query.InIndexQueryCriterion;
 import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
 import 
org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -927,7 +927,7 @@ public class TcpClientCache<K, V> implements ClientCache<K, 
V> {
      *
      * @param drMap DR map.
      */
-    public void putAllConflict(Map<? extends K, ? extends T2<? extends V, 
GridCacheVersion>> drMap) throws ClientException {
+    public void putAllConflict(Map<? extends K, ? extends T3<? extends V, 
GridCacheVersion, Long>> drMap) throws ClientException {
         A.notNull(drMap, "drMap");
 
         ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> 
writePutAllConflict(drMap, req));
@@ -939,7 +939,7 @@ public class TcpClientCache<K, V> implements ClientCache<K, 
V> {
      * @param drMap DR map.
      * @return Future.
      */
-    public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<? 
extends V, GridCacheVersion>> drMap)
+    public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T3<? 
extends V, GridCacheVersion, Long>> drMap)
         throws ClientException {
         A.notNull(drMap, "drMap");
 
@@ -1310,7 +1310,7 @@ public class TcpClientCache<K, V> implements 
ClientCache<K, V> {
 
     /** */
     private void writePutAllConflict(
-        Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
+        Map<? extends K, ? extends T3<? extends V, GridCacheVersion, Long>> 
map,
         PayloadOutputChannel req
     ) {
         checkDataReplicationSupported(req.clientChannel().protocolCtx());
@@ -1324,6 +1324,7 @@ public class TcpClientCache<K, V> implements 
ClientCache<K, V> {
                 serDes.writeObject(out, e.getKey());
                 serDes.writeObject(out, e.getValue().get1());
                 serDes.writeObject(out, e.getValue().get2());
+                out.writeLong(e.getValue().get3());
             });
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
index 288839c40be..ac3182b751b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
 import 
org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
@@ -62,10 +63,13 @@ public class ClientCachePutAllConflictRequest extends 
ClientCacheDataRequest imp
             KeyCacheObject key = readCacheObject(reader, true);
             CacheObject val = readCacheObject(reader, false);
             GridCacheVersion ver = 
(GridCacheVersion)reader.readObjectDetached();
+            long expireTime = reader.readLong();
 
-            GridCacheDrInfo info = expPlc ?
-                new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, 
EXPIRE_TIME_CALCULATE) :
-                new GridCacheDrInfo(val, ver);
+            GridCacheDrInfo info = expireTime != CU.EXPIRE_TIME_ETERNAL ?
+                new GridCacheDrExpirationInfo(val, ver, CU.TTL_ETERNAL, 
expireTime) :
+                (expPlc
+                    ? new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, 
EXPIRE_TIME_CALCULATE)
+                    : new GridCacheDrInfo(val, ver));
 
             map.put(key, info);
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index 3d456f2e729..b836e6facd6 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -58,9 +58,11 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.Ignite
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
 import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import 
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.RunnableX;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
@@ -90,6 +92,12 @@ public class CdcSelfTest extends AbstractCdcTest {
     /** */
     public static final String TX_CACHE_NAME = "tx-cache";
 
+    /** */
+    public static final long CREATE_TTL = 500_000L;
+
+    /** */
+    public static final long UPDATE_TTL = 60_000L;
+
     /** */
     @Parameterized.Parameter
     public boolean specificConsistentId;
@@ -178,6 +186,79 @@ public class CdcSelfTest extends AbstractCdcTest {
         }, true);
     }
 
+    /** */
+    @Test
+    public void testReadExpireTime() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+        Ignite ign = startGrid(cfg);
+
+        ign.cluster().state(ACTIVE);
+
+        IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        IgniteCache<Integer, User> withExpiry =
+            cache.withExpiryPolicy(new PlatformExpiryPolicy(CREATE_TTL, 
UPDATE_TTL, 0L));
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            if (i % 2 == 0) {
+                withExpiry.put(i, createUser(i)); // Create.
+                withExpiry.put(i, createUser(i)); // Update.
+            }
+            else {
+                cache.put(i, createUser(i)); // Create.
+                cache.put(i, createUser(i)); // Update.
+            }
+        }
+
+        removeData(cache, 0, KEYS_CNT);
+
+        Set<Integer> seen = new HashSet<>();
+
+        UserCdcConsumer cnsmr = new UserCdcConsumer() {
+            /** {@inheritDoc} */
+            @Override public void checkEvent(CdcEvent evt) {
+                super.checkEvent(evt);
+
+                Integer key = (Integer)evt.key();
+
+                if (evt.value() == null || key % 2 != 0) {
+                    assertEquals("Expire time must not be set [key=" + key + 
']', CU.EXPIRE_TIME_ETERNAL, evt.expireTime());
+
+                    return;
+                }
+
+                assertTrue(
+                    "Expire must be set [key=" + key + ']',
+                    evt.expireTime() != CU.EXPIRE_TIME_ETERNAL
+                );
+
+                long ttl = evt.expireTime() - System.currentTimeMillis();
+
+                assertTrue("Expire for operation", ttl <= (seen.contains(key) 
? UPDATE_TTL : CREATE_TTL));
+
+                seen.add(key);
+            }
+        };
+
+        CdcMain cdcMain = createCdc(cnsmr, cfg);
+
+        IgniteInternalFuture<?> cdcFut = runAsync(cdcMain);
+
+        waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+        waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
+
+        cdcFut.cancel();
+
+        assertTrue(cnsmr.stopped());
+
+        assertEquals(KEYS_CNT / 2, seen.size());
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
     /** */
     private void readAll(UserCdcConsumer cnsmr, boolean offsetCommit) throws 
Exception {
         IgniteConfiguration cfg = getConfiguration("ignite-0");
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
index b68ca7eeddd..c0c78101d9e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
@@ -17,21 +17,25 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.client.ClientCacheConfiguration;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.client.Person;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
@@ -42,6 +46,9 @@ public class DataReplicationOperationsTest extends 
AbstractThinClientTest {
     /** Keys count. */
     private static final int KEYS_CNT = 10;
 
+    /** TTL. */
+    public static final int TTL = 1000;
+
     /** */
     private static IgniteClient client;
 
@@ -55,28 +62,42 @@ public class DataReplicationOperationsTest extends 
AbstractThinClientTest {
     @Parameterized.Parameter
     public boolean binary;
 
+    /** Cache mode. */
+    @Parameterized.Parameter(1)
+    public CacheAtomicityMode mode;
+
     /** @return Test parameters. */
-    @Parameterized.Parameters(name = "binary={0}")
+    @Parameterized.Parameters(name = "binary={0}, cacheMode={1}")
     public static Collection<Object[]> parameters() {
-        return cartesianProduct(F.asList(false, true));
+        List<Object[]> params = new ArrayList<>();
+
+        for (boolean binary : new boolean[]{false, true})
+            for (CacheAtomicityMode mode : new 
CacheAtomicityMode[]{TRANSACTIONAL, ATOMIC})
+                params.add(new Object[]{binary, mode});
+
+        return params;
     }
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGrid();
+        startGrids(2);
 
-        client = startClient(grid());
+        client = startClient(grid(0));
     }
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        grid().destroyCaches(grid().cacheNames());
+        grid(0).destroyCaches(grid(0).cacheNames());
 
-        cache = (TcpClientCache<Object, 
Object>)client.createCache(DEFAULT_CACHE_NAME);
+        ClientCacheConfiguration ccfg = new ClientCacheConfiguration()
+            .setName(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(mode);
+
+        cache = (TcpClientCache<Object, Object>)client.createCache(ccfg);
 
         if (binary)
             cache = (TcpClientCache<Object, Object>)cache.withKeepBinary();
@@ -92,7 +113,7 @@ public class DataReplicationOperationsTest extends 
AbstractThinClientTest {
     /** */
     @Test
     public void testPutAllConflict() {
-        Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+        Map<Object, T3<Object, GridCacheVersion, Long>> data = 
createPutAllData(CU.EXPIRE_TIME_ETERNAL);
 
         cache.putAllConflict(data);
 
@@ -121,10 +142,11 @@ public class DataReplicationOperationsTest extends 
AbstractThinClientTest {
     /** @throws Exception If fails. */
     @Test
     public void testWithExpiryPolicy() throws Exception {
-        PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(1000, 1000, 
1000);
+        PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(TTL, TTL, TTL);
 
         ClientCacheConfiguration ccfgWithExpPlc = new 
ClientCacheConfiguration()
             .setName("cache-with-expiry-policy")
+            .setAtomicityMode(mode)
             .setExpiryPolicy(expPlc);
 
         TcpClientCache<Object, Object> cache = (TcpClientCache<Object, 
Object>)client.getOrCreateCache(ccfgWithExpPlc);
@@ -132,7 +154,7 @@ public class DataReplicationOperationsTest extends 
AbstractThinClientTest {
         TcpClientCache<Object, Object> cacheWithExpPlc = binary ?
             (TcpClientCache<Object, Object>)cache.withKeepBinary() : cache;
 
-        Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+        Map<Object, T3<Object, GridCacheVersion, Long>> data = 
createPutAllData(CU.EXPIRE_TIME_ETERNAL);
 
         cacheWithExpPlc.putAllConflict(data);
 
@@ -144,16 +166,34 @@ public class DataReplicationOperationsTest extends 
AbstractThinClientTest {
         ));
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testWithPerEntryExpiry() throws Exception {
+        TcpClientCache<Object, Object> cache0 =
+            (TcpClientCache<Object, 
Object>)client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        TcpClientCache<Object, Object> cache = binary ?
+            (TcpClientCache<Object, Object>)cache0.withKeepBinary() : cache0;
+
+        Map<Object, T3<Object, GridCacheVersion, Long>> data = 
createPutAllData(System.currentTimeMillis() + TTL);
+
+        cache.putAllConflict(data);
+
+        assertTrue(cache.containsKeys(data.keySet()));
+
+        assertTrue(waitForCondition(() -> 
data.keySet().stream().noneMatch(cache::containsKey), 2 * TTL));
+    }
+
     /** */
-    private Map<Object, T2<Object, GridCacheVersion>> createPutAllData() {
-        Map<Object, T2<Object, GridCacheVersion>> map = new HashMap<>();
+    private Map<Object, T3<Object, GridCacheVersion, Long>> 
createPutAllData(long expireTime) {
+        Map<Object, T3<Object, GridCacheVersion, Long>> map = new HashMap<>();
 
         for (int i = 0; i < KEYS_CNT; i++) {
             Person key = new Person(i, "Person-" + i);
             Person val = new Person(i, "Person-" + i);
 
             map.put(binary ? client.binary().toBinary(key) : key,
-                new T2<>(binary ? client.binary().toBinary(val) : val, 
otherVer));
+                new T3<>(binary ? client.binary().toBinary(val) : val, 
otherVer, expireTime));
         }
 
         return map;

Reply via email to