This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 721ea0b8502 IGNITE-17449 Support expire time in CDC (#10583)
721ea0b8502 is described below
commit 721ea0b85023523c295ba68c1accee7e18fe6522
Author: Nikolay <[email protected]>
AuthorDate: Mon Mar 13 17:02:13 2023 +0300
IGNITE-17449 Support expire time in CDC (#10583)
---
.../clients/AbstractClientCompatibilityTest.java | 3 +
.../clients/JavaThinCompatibilityTest.java | 10 +--
.../main/java/org/apache/ignite/cdc/CdcEvent.java | 10 +++
.../apache/ignite/internal/cdc/CdcEventImpl.java | 21 +++++-
.../ignite/internal/cdc/WalRecordsConsumer.java | 3 +-
.../internal/client/thin/TcpClientCache.java | 9 +--
.../cache/ClientCachePutAllConflictRequest.java | 10 ++-
.../java/org/apache/ignite/cdc/CdcSelfTest.java | 81 ++++++++++++++++++++++
.../client/thin/DataReplicationOperationsTest.java | 70 +++++++++++++++----
9 files changed, 188 insertions(+), 29 deletions(-)
diff --git
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index 4d04a10322c..850726a7f41 100644
---
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -75,6 +75,9 @@ public abstract class AbstractClientCompatibilityTest extends
IgniteCompatibilit
/** Version 2.14.0. */
protected static final IgniteProductVersion VER_2_14_0 =
IgniteProductVersion.fromString("2.14.0");
+ /** Version 2.15.0. */
+ protected static final IgniteProductVersion VER_2_15_0 =
IgniteProductVersion.fromString("2.15.0");
+
/** Parameters. */
@Parameterized.Parameters(name = "Version {0}")
public static Iterable<Object[]> versions() {
diff --git
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index cee1d214ef3..ad666ced379 100644
---
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -63,8 +63,9 @@ import org.apache.ignite.internal.client.thin.TcpClientCache;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.resources.ServiceContextResource;
@@ -448,8 +449,8 @@ public class JavaThinCompatibilityTest extends
AbstractClientCompatibilityTest {
}
}
- if (clientVer.compareTo(VER_2_14_0) >= 0)
- testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >=
0);
+ if (clientVer.compareTo(VER_2_15_0) >= 0)
+ testDataReplicationOperations(serverVer.compareTo(VER_2_15_0) >=
0);
if (clientVer.compareTo(VER_2_14_0) >= 0)
new JavaThinIndexQueryCompatibilityTest().testIndexQueries(ADDR,
serverVer.compareTo(VER_2_14_0) >= 0);
@@ -515,7 +516,8 @@ public class JavaThinCompatibilityTest extends
AbstractClientCompatibilityTest {
TcpClientCache<Object, Object> cache = (TcpClientCache<Object,
Object>)client
.getOrCreateCache("test-cache-replication");
- Map<Object, T2<Object, GridCacheVersion>> puts = F.asMap(1, new
T2<>(1, new GridCacheVersion(1, 1, 1, 2)));
+ Map<Object, T3<Object, GridCacheVersion, Long>> puts =
+ F.asMap(1, new T3<>(1, new GridCacheVersion(1, 1, 1, 2),
U.currentTimeMillis() + 1000));
Map<Object, GridCacheVersion> rmvs = F.asMap(1, new
GridCacheVersion(1, 1, 1, 2));
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
index 2f8f7a5a665..5f1a490fa6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
@@ -18,6 +18,8 @@
package org.apache.ignite.cdc;
import java.io.Serializable;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.internal.cdc.CdcMain;
@@ -76,4 +78,12 @@ public interface CdcEvent extends Serializable {
* @see CacheView#cacheId()
*/
public int cacheId();
+
+ /**
+ * @return Time when entry will be removed from cache. If {@code 0} then
entry will be cached until removed.
+ * @see org.apache.ignite.IgniteCache#withExpiryPolicy(ExpiryPolicy)
+ * @see
org.apache.ignite.configuration.CacheConfiguration#setExpiryPolicyFactory(Factory)
+ * @see
org.apache.ignite.internal.processors.cache.GridCacheUtils#EXPIRE_TIME_ETERNAL
+ */
+ public long expireTime();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
index a12aa0e388a..96f6adfd079 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java
@@ -51,6 +51,9 @@ public class CdcEventImpl implements CdcEvent {
/** Cache id. */
private final int cacheId;
+ /** Expire time. */
+ private final long expireTime;
+
/**
* @param key Key.
* @param val Value.
@@ -58,15 +61,24 @@ public class CdcEventImpl implements CdcEvent {
* @param part Partition.
* @param ord Order of the entry change.
* @param cacheId Cache id.
+ * @param expireTime Expire time.
*/
- public CdcEventImpl(Object key, Object val, boolean primary, int part,
- CacheEntryVersion ord, int cacheId) {
+ public CdcEventImpl(
+ Object key,
+ Object val,
+ boolean primary,
+ int part,
+ CacheEntryVersion ord,
+ int cacheId,
+ long expireTime
+ ) {
this.key = key;
this.val = val;
this.primary = primary;
this.part = part;
this.ord = ord;
this.cacheId = cacheId;
+ this.expireTime = expireTime;
}
/** {@inheritDoc} */
@@ -99,6 +111,11 @@ public class CdcEventImpl implements CdcEvent {
return cacheId;
}
+ /** {@inheritDoc} */
+ @Override public long expireTime() {
+ return expireTime;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CdcEventImpl.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 3d450d6cd61..8a004031644 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -99,7 +99,8 @@ public class WalRecordsConsumer<K, V> {
(e.flags() & DataEntry.PRIMARY_FLAG) != 0,
e.partitionId(),
e.writeVersion(),
- e.cacheId()
+ e.cacheId(),
+ e.expireTime()
);
};
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 62dd9f1d83e..5edf586652b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -57,7 +57,7 @@ import
org.apache.ignite.internal.cache.query.InIndexQueryCriterion;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import
org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -927,7 +927,7 @@ public class TcpClientCache<K, V> implements ClientCache<K,
V> {
*
* @param drMap DR map.
*/
- public void putAllConflict(Map<? extends K, ? extends T2<? extends V,
GridCacheVersion>> drMap) throws ClientException {
+ public void putAllConflict(Map<? extends K, ? extends T3<? extends V,
GridCacheVersion, Long>> drMap) throws ClientException {
A.notNull(drMap, "drMap");
ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req ->
writePutAllConflict(drMap, req));
@@ -939,7 +939,7 @@ public class TcpClientCache<K, V> implements ClientCache<K,
V> {
* @param drMap DR map.
* @return Future.
*/
- public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<?
extends V, GridCacheVersion>> drMap)
+ public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T3<?
extends V, GridCacheVersion, Long>> drMap)
throws ClientException {
A.notNull(drMap, "drMap");
@@ -1310,7 +1310,7 @@ public class TcpClientCache<K, V> implements
ClientCache<K, V> {
/** */
private void writePutAllConflict(
- Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
+ Map<? extends K, ? extends T3<? extends V, GridCacheVersion, Long>>
map,
PayloadOutputChannel req
) {
checkDataReplicationSupported(req.clientChannel().protocolCtx());
@@ -1324,6 +1324,7 @@ public class TcpClientCache<K, V> implements
ClientCache<K, V> {
serDes.writeObject(out, e.getKey());
serDes.writeObject(out, e.getValue().get1());
serDes.writeObject(out, e.getValue().get2());
+ out.writeLong(e.getValue().get3());
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
index 288839c40be..ac3182b751b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
import
org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
@@ -62,10 +63,13 @@ public class ClientCachePutAllConflictRequest extends
ClientCacheDataRequest imp
KeyCacheObject key = readCacheObject(reader, true);
CacheObject val = readCacheObject(reader, false);
GridCacheVersion ver =
(GridCacheVersion)reader.readObjectDetached();
+ long expireTime = reader.readLong();
- GridCacheDrInfo info = expPlc ?
- new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED,
EXPIRE_TIME_CALCULATE) :
- new GridCacheDrInfo(val, ver);
+ GridCacheDrInfo info = expireTime != CU.EXPIRE_TIME_ETERNAL ?
+ new GridCacheDrExpirationInfo(val, ver, CU.TTL_ETERNAL,
expireTime) :
+ (expPlc
+ ? new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED,
EXPIRE_TIME_CALCULATE)
+ : new GridCacheDrInfo(val, ver));
map.put(key, info);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index 3d456f2e729..b836e6facd6 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -58,9 +58,11 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.Ignite
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
@@ -90,6 +92,12 @@ public class CdcSelfTest extends AbstractCdcTest {
/** */
public static final String TX_CACHE_NAME = "tx-cache";
+ /** */
+ public static final long CREATE_TTL = 500_000L;
+
+ /** */
+ public static final long UPDATE_TTL = 60_000L;
+
/** */
@Parameterized.Parameter
public boolean specificConsistentId;
@@ -178,6 +186,79 @@ public class CdcSelfTest extends AbstractCdcTest {
}, true);
}
+ /** */
+ @Test
+ public void testReadExpireTime() throws Exception {
+ IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+ Ignite ign = startGrid(cfg);
+
+ ign.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, User> cache =
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ IgniteCache<Integer, User> withExpiry =
+ cache.withExpiryPolicy(new PlatformExpiryPolicy(CREATE_TTL,
UPDATE_TTL, 0L));
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ if (i % 2 == 0) {
+ withExpiry.put(i, createUser(i)); // Create.
+ withExpiry.put(i, createUser(i)); // Update.
+ }
+ else {
+ cache.put(i, createUser(i)); // Create.
+ cache.put(i, createUser(i)); // Update.
+ }
+ }
+
+ removeData(cache, 0, KEYS_CNT);
+
+ Set<Integer> seen = new HashSet<>();
+
+ UserCdcConsumer cnsmr = new UserCdcConsumer() {
+ /** {@inheritDoc} */
+ @Override public void checkEvent(CdcEvent evt) {
+ super.checkEvent(evt);
+
+ Integer key = (Integer)evt.key();
+
+ if (evt.value() == null || key % 2 != 0) {
+ assertEquals("Expire time must not be set [key=" + key +
']', CU.EXPIRE_TIME_ETERNAL, evt.expireTime());
+
+ return;
+ }
+
+ assertTrue(
+ "Expire must be set [key=" + key + ']',
+ evt.expireTime() != CU.EXPIRE_TIME_ETERNAL
+ );
+
+ long ttl = evt.expireTime() - System.currentTimeMillis();
+
+ assertTrue("Expire for operation", ttl <= (seen.contains(key)
? UPDATE_TTL : CREATE_TTL));
+
+ seen.add(key);
+ }
+ };
+
+ CdcMain cdcMain = createCdc(cnsmr, cfg);
+
+ IgniteInternalFuture<?> cdcFut = runAsync(cdcMain);
+
+ waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
+ waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
+
+ cdcFut.cancel();
+
+ assertTrue(cnsmr.stopped());
+
+ assertEquals(KEYS_CNT / 2, seen.size());
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
/** */
private void readAll(UserCdcConsumer cnsmr, boolean offsetCommit) throws
Exception {
IgniteConfiguration cfg = getConfiguration("ignite-0");
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
index b68ca7eeddd..c0c78101d9e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
@@ -17,21 +17,25 @@
package org.apache.ignite.internal.client.thin;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.Person;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
@@ -42,6 +46,9 @@ public class DataReplicationOperationsTest extends
AbstractThinClientTest {
/** Keys count. */
private static final int KEYS_CNT = 10;
+ /** TTL. */
+ public static final int TTL = 1000;
+
/** */
private static IgniteClient client;
@@ -55,28 +62,42 @@ public class DataReplicationOperationsTest extends
AbstractThinClientTest {
@Parameterized.Parameter
public boolean binary;
+ /** Cache mode. */
+ @Parameterized.Parameter(1)
+ public CacheAtomicityMode mode;
+
/** @return Test parameters. */
- @Parameterized.Parameters(name = "binary={0}")
+ @Parameterized.Parameters(name = "binary={0}, cacheMode={1}")
public static Collection<Object[]> parameters() {
- return cartesianProduct(F.asList(false, true));
+ List<Object[]> params = new ArrayList<>();
+
+ for (boolean binary : new boolean[]{false, true})
+ for (CacheAtomicityMode mode : new
CacheAtomicityMode[]{TRANSACTIONAL, ATOMIC})
+ params.add(new Object[]{binary, mode});
+
+ return params;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- startGrid();
+ startGrids(2);
- client = startClient(grid());
+ client = startClient(grid(0));
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- grid().destroyCaches(grid().cacheNames());
+ grid(0).destroyCaches(grid(0).cacheNames());
- cache = (TcpClientCache<Object,
Object>)client.createCache(DEFAULT_CACHE_NAME);
+ ClientCacheConfiguration ccfg = new ClientCacheConfiguration()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(mode);
+
+ cache = (TcpClientCache<Object, Object>)client.createCache(ccfg);
if (binary)
cache = (TcpClientCache<Object, Object>)cache.withKeepBinary();
@@ -92,7 +113,7 @@ public class DataReplicationOperationsTest extends
AbstractThinClientTest {
/** */
@Test
public void testPutAllConflict() {
- Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+ Map<Object, T3<Object, GridCacheVersion, Long>> data =
createPutAllData(CU.EXPIRE_TIME_ETERNAL);
cache.putAllConflict(data);
@@ -121,10 +142,11 @@ public class DataReplicationOperationsTest extends
AbstractThinClientTest {
/** @throws Exception If fails. */
@Test
public void testWithExpiryPolicy() throws Exception {
- PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(1000, 1000,
1000);
+ PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(TTL, TTL, TTL);
ClientCacheConfiguration ccfgWithExpPlc = new
ClientCacheConfiguration()
.setName("cache-with-expiry-policy")
+ .setAtomicityMode(mode)
.setExpiryPolicy(expPlc);
TcpClientCache<Object, Object> cache = (TcpClientCache<Object,
Object>)client.getOrCreateCache(ccfgWithExpPlc);
@@ -132,7 +154,7 @@ public class DataReplicationOperationsTest extends
AbstractThinClientTest {
TcpClientCache<Object, Object> cacheWithExpPlc = binary ?
(TcpClientCache<Object, Object>)cache.withKeepBinary() : cache;
- Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+ Map<Object, T3<Object, GridCacheVersion, Long>> data =
createPutAllData(CU.EXPIRE_TIME_ETERNAL);
cacheWithExpPlc.putAllConflict(data);
@@ -144,16 +166,34 @@ public class DataReplicationOperationsTest extends
AbstractThinClientTest {
));
}
+ /** @throws Exception If fails. */
+ @Test
+ public void testWithPerEntryExpiry() throws Exception {
+ TcpClientCache<Object, Object> cache0 =
+ (TcpClientCache<Object,
Object>)client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ TcpClientCache<Object, Object> cache = binary ?
+ (TcpClientCache<Object, Object>)cache0.withKeepBinary() : cache0;
+
+ Map<Object, T3<Object, GridCacheVersion, Long>> data =
createPutAllData(System.currentTimeMillis() + TTL);
+
+ cache.putAllConflict(data);
+
+ assertTrue(cache.containsKeys(data.keySet()));
+
+ assertTrue(waitForCondition(() ->
data.keySet().stream().noneMatch(cache::containsKey), 2 * TTL));
+ }
+
/** */
- private Map<Object, T2<Object, GridCacheVersion>> createPutAllData() {
- Map<Object, T2<Object, GridCacheVersion>> map = new HashMap<>();
+ private Map<Object, T3<Object, GridCacheVersion, Long>>
createPutAllData(long expireTime) {
+ Map<Object, T3<Object, GridCacheVersion, Long>> map = new HashMap<>();
for (int i = 0; i < KEYS_CNT; i++) {
Person key = new Person(i, "Person-" + i);
Person val = new Person(i, "Person-" + i);
map.put(binary ? client.binary().toBinary(key) : key,
- new T2<>(binary ? client.binary().toBinary(val) : val,
otherVer));
+ new T3<>(binary ? client.binary().toBinary(val) : val,
otherVer, expireTime));
}
return map;