This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4fb139d IGNITE-12399 Java thin client: add cache expiry policies -
Fixes #7085.
4fb139d is described below
commit 4fb139d10913ba757900fa1231ba6dfa1afc16b5
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Nov 29 16:18:17 2019 +0300
IGNITE-12399 Java thin client: add cache expiry policies - Fixes #7085.
---
.../java/org/apache/ignite/client/ClientCache.java | 9 ++
.../ignite/client/ClientCacheConfiguration.java | 20 ++++
.../ignite/internal/client/thin/ClientUtils.java | 31 +++++-
.../internal/client/thin/ProtocolVersion.java | 5 +-
.../internal/client/thin/TcpClientCache.java | 72 ++++++++-----
.../internal/client/thin/TcpClientChannel.java | 2 +
.../cache/expiry/PlatformExpiryPolicy.java | 17 +++
.../org/apache/ignite/client/FunctionalTest.java | 114 ++++++++++++++++++++-
8 files changed, 240 insertions(+), 30 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
index 2f0a044..bffdbe5 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
@@ -337,6 +338,14 @@ public interface ClientCache<K, V> {
public <K1, V1> ClientCache<K1, V1> withKeepBinary();
/**
+ * Returns cache with the specified expired policy set. This policy will
be used for each operation invoked on
+ * the returned cache.
+ *
+ * @return Cache instance with the specified expiry policy set.
+ */
+ public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy
expirePlc);
+
+ /**
* Queries cache. Supports {@link ScanQuery} and {@link SqlFieldsQuery}.
*
* @param qry Query.
diff --git
a/modules/core/src/main/java/org/apache/ignite/client/ClientCacheConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientCacheConfiguration.java
index a3db31d..8989ee4 100644
---
a/modules/core/src/main/java/org/apache/ignite/client/ClientCacheConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/client/ClientCacheConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client;
import java.io.Serializable;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
@@ -124,6 +125,9 @@ public final class ClientCacheConfiguration implements
Serializable {
/** @serial Query entities. */
private QueryEntity[] qryEntities = null;
+ /** @serial Expiry policy. */
+ private ExpiryPolicy expiryPlc;
+
/**
* @return Cache name.
*/
@@ -648,6 +652,22 @@ public final class ClientCacheConfiguration implements
Serializable {
return this;
}
+ /**
+ * @return Expire policy.
+ */
+ public ExpiryPolicy getExpiryPolicy() {
+ return expiryPlc;
+ }
+
+ /**
+ * @param expiryPlc Expiry policy.
+ */
+ public ClientCacheConfiguration setExpiryPolicy(ExpiryPolicy expiryPlc) {
+ this.expiryPlc = expiryPlc;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientCacheConfiguration.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
index 3175cfc..a0ea2d2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
@@ -34,6 +34,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -55,8 +56,11 @@ import org.apache.ignite.internal.binary.BinarySchema;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0;
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_6_0;
+import static
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
/**
* Shared serialization/deserialization utils.
@@ -342,6 +346,24 @@ final class ClientUtils {
)
);
+ if (ver.compareTo(V1_6_0) >= 0) {
+ itemWriter.accept(CfgItem.EXPIRE_POLICY, w -> {
+ ExpiryPolicy expiryPlc = cfg.getExpiryPolicy();
+ if (expiryPlc == null)
+ w.writeBoolean(false);
+ else {
+ w.writeBoolean(true);
+
w.writeLong(convertDuration(expiryPlc.getExpiryForCreation()));
+
w.writeLong(convertDuration(expiryPlc.getExpiryForUpdate()));
+
w.writeLong(convertDuration(expiryPlc.getExpiryForAccess()));
+ }
+ });
+ }
+ else if (cfg.getExpiryPolicy() != null) {
+ throw new ClientProtocolError(String.format("Expire policies
have not supported by the server " +
+ "version %s, required version %s", ver, V1_6_0));
+ }
+
writer.writeInt(origPos, out.position() - origPos - 4); //
configuration length
writer.writeInt(origPos + 4, propCnt.get()); // properties count
}
@@ -467,7 +489,11 @@ final class ClientUtils {
}
));
}
- ).toArray(new QueryEntity[0]));
+ ).toArray(new QueryEntity[0]))
+ .setExpiryPolicy(
+ ver.compareTo(V1_6_0) < 0 ? null : reader.readBoolean() ?
+ new PlatformExpiryPolicy(reader.readLong(),
reader.readLong(), reader.readLong()) : null
+ );
}
}
@@ -638,7 +664,8 @@ final class ClientUtils {
/** Sql index max inline size. */SQL_IDX_MAX_INLINE_SIZE(204),
/** Sql schema. */SQL_SCHEMA(203),
/** Key configs. */KEY_CONFIGS(401),
- /** Key entities. */QUERY_ENTITIES(200);
+ /** Key entities. */QUERY_ENTITIES(200),
+ /** Expire policy. */EXPIRE_POLICY(407);
/** Code. */
private final short code;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
index 616561d..397b400 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.client.thin;
/** Thin client protocol version. */
public final class ProtocolVersion implements Comparable<ProtocolVersion> {
+ /** Protocol version: 1.6.0. Expiration policy support. */
+ public static final ProtocolVersion V1_6_0 = new ProtocolVersion((short)1,
(short)6, (short)0);
+
/** Protocol version: 1.5.0. Transactions support. */
public static final ProtocolVersion V1_5_0 = new ProtocolVersion((short)1,
(short)5, (short)0);
@@ -35,7 +38,7 @@ public final class ProtocolVersion implements
Comparable<ProtocolVersion> {
public static final ProtocolVersion V1_0_0 = new ProtocolVersion((short)1,
(short)0, (short)0);
/** Current protocol version. */
- public static final ProtocolVersion CURRENT_VER = V1_5_0;
+ public static final ProtocolVersion CURRENT_VER = V1_6_0;
/** Major. */
private final short major;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index fa07556..4bedc5a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.cache.Cache;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
@@ -42,6 +43,8 @@ import
org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import
org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import static java.util.AbstractMap.SimpleEntry;
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_6_0;
+import static
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
/**
* Implementation of {@link ClientCache} over TCP protocol.
@@ -53,6 +56,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
/** "Transactional" flag mask. */
private static final byte TRANSACTIONAL_FLAG_MASK = 0x02;
+ /** "With expiry policy" flag mask. */
+ private static final byte WITH_EXPIRY_POLICY_FLAG_MASK = 0x04;
+
/** Cache id. */
private final int cacheId;
@@ -72,10 +78,19 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
private final ClientUtils serDes;
/** Indicates if cache works with Ignite Binary format. */
- private boolean keepBinary = false;
+ private final boolean keepBinary;
+
+ /** Expiry policy. */
+ private final ExpiryPolicy expiryPlc;
/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions) {
+ this(name, ch, marsh, transactions, false, null);
+ }
+
+ /** Constructor. */
+ TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions,
+ boolean keepBinary, ExpiryPolicy expiryPlc) {
this.name = name;
this.cacheId = ClientUtils.cacheId(name);
this.ch = ch;
@@ -83,6 +98,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
this.transactions = transactions;
serDes = new ClientUtils(marsh);
+
+ this.keepBinary = keepBinary;
+ this.expiryPlc = expiryPlc;
}
/** {@inheritDoc} */
@@ -361,26 +379,13 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
- TcpClientCache<K1, V1> binCache;
-
- if (keepBinary) {
- try {
- binCache = (TcpClientCache<K1, V1>)this;
- }
- catch (ClassCastException ex) {
- throw new IllegalStateException(
- "Trying to enable binary mode on already binary cache with
different key/value type arguments.",
- ex
- );
- }
- }
- else {
- binCache = new TcpClientCache<>(name, ch, marsh, transactions);
-
- binCache.keepBinary = true;
- }
+ return keepBinary ? (ClientCache<K1, V1>)this :
+ new TcpClientCache<>(name, ch, marsh, transactions, true,
expiryPlc);
+ }
- return binCache;
+ /** {@inheritDoc} */
+ @Override public <K1, V1> ClientCache<K1, V1>
withExpirePolicy(ExpiryPolicy expirePlc) {
+ return new TcpClientCache<>(name, ch, marsh, transactions, keepBinary,
expirePlc);
}
/** {@inheritDoc} */
@@ -515,19 +520,34 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
TcpClientTransaction tx = transactions.tx();
- if (tx != null) {
- flags |= TRANSACTIONAL_FLAG_MASK;
+ if (expiryPlc != null) {
+ if (payloadCh.clientChannel().serverVersion().compareTo(V1_6_0) <
0) {
+ throw new ClientProtocolError(String.format("Expire policies
have not supported by the server " +
+ "version %s, required version %s",
payloadCh.clientChannel().serverVersion(), V1_6_0));
+ }
+ flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
+ }
+
+ if (tx != null) {
if (tx.clientChannel() != payloadCh.clientChannel()) {
throw new ClientException("Transaction context has been lost
due to connection errors. " +
"Cache operations are prohibited until current transaction
closed.");
}
- out.writeByte(flags);
- out.writeInt(tx.txId());
+ flags |= TRANSACTIONAL_FLAG_MASK;
}
- else
- out.writeByte(flags);
+
+ out.writeByte(flags);
+
+ if ((flags & WITH_EXPIRY_POLICY_FLAG_MASK) != 0) {
+ out.writeLong(convertDuration(expiryPlc.getExpiryForCreation()));
+ out.writeLong(convertDuration(expiryPlc.getExpiryForUpdate()));
+ out.writeLong(convertDuration(expiryPlc.getExpiryForAccess()));
+ }
+
+ if ((flags & TRANSACTIONAL_FLAG_MASK) != 0)
+ out.writeInt(tx.txId());
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index e23e20b..4582b5a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -84,6 +84,7 @@ import static
org.apache.ignite.internal.client.thin.ProtocolVersion.V1_1_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_4_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_5_0;
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_6_0;
/**
* Implements {@link ClientChannel} over TCP.
@@ -91,6 +92,7 @@ import static
org.apache.ignite.internal.client.thin.ProtocolVersion.V1_5_0;
class TcpClientChannel implements ClientChannel {
/** Supported protocol versions. */
private static final Collection<ProtocolVersion> supportedVers =
Arrays.asList(
+ V1_6_0,
V1_5_0,
V1_4_0,
V1_2_0,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/expiry/PlatformExpiryPolicy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/expiry/PlatformExpiryPolicy.java
index d86d889..4d880be 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/expiry/PlatformExpiryPolicy.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/expiry/PlatformExpiryPolicy.java
@@ -90,4 +90,21 @@ public class PlatformExpiryPolicy implements ExpiryPolicy {
return new Duration(TimeUnit.MILLISECONDS, dur);
}
}
+
+ /**
+ * Convert actual duration to encoded duration for serialization.
+ *
+ * @param dur Actual duration.
+ * @return Encoded duration.
+ */
+ public static long convertDuration(Duration dur) {
+ if (dur == null)
+ return DUR_UNCHANGED;
+ else if (dur.isEternal())
+ return DUR_ETERNAL;
+ else if (dur.isZero())
+ return DUR_ZERO;
+ else
+ return dur.getTimeUnit().toMillis(dur.getDurationAmount());
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
index ff2931e..510e791 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
@@ -30,14 +30,20 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
@@ -50,8 +56,10 @@ import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import
org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.testframework.GridTestUtils;
@@ -194,7 +202,8 @@ public class FunctionalTest {
.setDefaultFieldValues(Collections.singletonMap("id", 0))
.setIndexes(Collections.singletonList(new QueryIndex("id",
true, "IDX_EMPLOYEE_ID")))
.setAliases(Stream.of("id",
"orgId").collect(Collectors.toMap(f -> f, String::toUpperCase)))
- );
+ )
+ .setExpiryPolicy(new PlatformExpiryPolicy(10, 20, 30));
ClientCache cache = client.createCache(cacheCfg);
@@ -816,6 +825,109 @@ public class FunctionalTest {
}
}
+ /**
+ * Test cache with expire policy.
+ */
+ @Test
+ public void testExpirePolicy() throws Exception {
+ long ttl = 600L;
+ int MAX_RETRIES = 5;
+
+ try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
+ IgniteClient client =
Ignition.startClient(getClientConfiguration())
+ ) {
+ ClientCache<Integer, Object> cache = client.createCache(new
ClientCacheConfiguration()
+ .setName("cache")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ );
+
+ Duration dur = new Duration(TimeUnit.MILLISECONDS, ttl);
+
+ ClientCache<Integer, Object> cachePlcCreated =
cache.withExpirePolicy(new CreatedExpiryPolicy(dur));
+ ClientCache<Integer, Object> cachePlcUpdated =
cache.withExpirePolicy(new ModifiedExpiryPolicy(dur));
+ ClientCache<Integer, Object> cachePlcAccessed =
cache.withExpirePolicy(new AccessedExpiryPolicy(dur));
+
+ for (int i = 0; i < MAX_RETRIES; i++) {
+ cache.clear();
+
+ long ts = U.currentTimeMillis();
+
+ cache.put(0, 0);
+ cachePlcCreated.put(1, 1);
+ cachePlcUpdated.put(2, 2);
+ cachePlcAccessed.put(3, 3);
+
+ U.sleep(ttl / 3 * 2);
+
+ boolean containsKey0 = cache.containsKey(0);
+ boolean containsKey1 = cache.containsKey(1);
+ boolean containsKey2 = cache.containsKey(2);
+ boolean containsKey3 = cache.containsKey(3);
+
+ if (U.currentTimeMillis() - ts >= ttl) // Retry if this block
execution takes too long.
+ continue;
+
+ assertTrue(containsKey0);
+ assertTrue(containsKey1);
+ assertTrue(containsKey2);
+ assertTrue(containsKey3);
+
+ ts = U.currentTimeMillis();
+
+ cachePlcCreated.put(1, 2);
+ cachePlcCreated.get(1); // Update and access key with created
expire policy.
+ cachePlcUpdated.put(2, 3); // Update key with modified expire
policy.
+ cachePlcAccessed.get(3); // Access key with accessed expire
policy.
+
+ U.sleep(ttl / 3 * 2);
+
+ containsKey0 = cache.containsKey(0);
+ containsKey1 = cache.containsKey(1);
+ containsKey2 = cache.containsKey(2);
+ containsKey3 = cache.containsKey(3);
+
+ if (U.currentTimeMillis() - ts >= ttl) // Retry if this block
execution takes too long.
+ continue;
+
+ assertTrue(containsKey0);
+ assertFalse(containsKey1);
+ assertTrue(containsKey2);
+ assertTrue(containsKey3);
+
+ U.sleep(ttl / 3 * 2);
+
+ cachePlcUpdated.get(2); // Access key with updated expire
policy.
+
+ U.sleep(ttl / 3 * 2);
+
+ assertTrue(cache.containsKey(0));
+ assertFalse(cache.containsKey(1));
+ assertFalse(cache.containsKey(2));
+ assertFalse(cache.containsKey(3));
+
+ // Expire policy, keep binary and transactional flags together.
+ ClientCache<Integer, Object> binCache =
cachePlcCreated.withKeepBinary();
+
+ try (ClientTransaction tx = client.transactions().txStart()) {
+ binCache.put(4, new T2<>("test", "test"));
+
+ tx.commit();
+ }
+
+ assertTrue(binCache.get(4) instanceof BinaryObject);
+ assertFalse(cache.get(4) instanceof BinaryObject);
+
+ U.sleep(ttl / 3 * 4);
+
+ assertFalse(cache.containsKey(4));
+
+ return;
+ }
+
+ fail("Failed to check expire policy within " + MAX_RETRIES + "
retries (block execution takes too long)");
+ }
+ }
+
/** */
private static ClientConfiguration getClientConfiguration() {
return new ClientConfiguration()