Repository: incubator-ignite Updated Branches: refs/heads/ignite-41 5f95bd2e5 -> 85de24713
# ignite-41 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/85de2471 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/85de2471 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/85de2471 Branch: refs/heads/ignite-41 Commit: 85de247138059e611110703165ddc07bded66cc4 Parents: 5f95bd2 Author: sboikov <[email protected]> Authored: Thu Dec 18 15:05:39 2014 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 18 17:50:41 2014 +0300 ---------------------------------------------------------------------- .../cache/GridCacheAccessExpiryPolicy.java | 34 +++-- .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheExpiryPolicy.java | 47 ++++++ .../processors/cache/GridCacheMapEntry.java | 74 ++++----- .../processors/cache/GridCacheTxEntry.java | 2 +- .../kernal/processors/cache/GridCacheUtils.java | 2 +- .../distributed/GridCacheExpiryPolicy.java | 152 ------------------ .../GridCacheExternalizableExpiryPolicy.java | 153 +++++++++++++++++++ .../distributed/dht/GridDhtCacheAdapter.java | 10 +- .../cache/distributed/dht/GridDhtGetFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 128 +++++++++++++--- .../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +- .../distributed/near/GridNearAtomicCache.java | 15 +- .../distributed/near/GridNearCacheAdapter.java | 23 ++- .../distributed/near/GridNearCacheEntry.java | 11 +- .../distributed/near/GridNearGetFuture.java | 8 +- .../near/GridNearTransactionalCache.java | 14 +- .../local/atomic/GridLocalAtomicCache.java | 18 ++- .../GridTcpCommunicationMessageFactory.java | 2 +- .../IgniteCacheExpiryPolicyAbstractTest.java | 119 +++++++++++---- .../cache/GridCacheAbstractTtlSelfTest.java | 95 ------------ .../processors/cache/GridCacheTestEntryEx.java | 6 +- .../near/GridCachePartitionedTtlSelfTest.java | 25 --- .../GridCacheReplicatedTtlSelfTest.java | 24 --- .../cache/local/GridCacheLocalTtlSelfTest.java | 25 --- .../bamboo/GridDataGridTestSuite.java | 2 - .../cache/GridCacheAbstractQuerySelfTest.java | 14 +- 29 files changed, 538 insertions(+), 483 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java index 07f4ae8..f0904dd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java @@ -20,9 +20,9 @@ import java.util.*; /** * */ -public class GridCacheAccessExpiryPolicy { +public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy { /** */ - private final long ttl; + private final long accessTtl; /** */ private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; @@ -44,19 +44,27 @@ public class GridCacheAccessExpiryPolicy { } /** - * @param ttl TTL for access. + * @param accessTtl TTL for access. */ - public GridCacheAccessExpiryPolicy(long ttl) { - assert ttl >= 0 : ttl; + public GridCacheAccessExpiryPolicy(long accessTtl) { + assert accessTtl >= 0 : accessTtl; - this.ttl = ttl; + this.accessTtl = accessTtl; } - /** - * @return TTL. - */ - public long ttl() { - return ttl; + /** {@inheritDoc} */ + @Override public long forAccess() { + return accessTtl; + } + + /** {@inheritDoc} */ + @Override public long forCreate() { + return -1L; + } + + /** {@inheritDoc} */ + @Override public long forUpdate() { + return -1L; } /** @@ -75,7 +83,7 @@ public class GridCacheAccessExpiryPolicy { * @param ver Entry version. */ @SuppressWarnings("unchecked") - public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) { + @Override public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) { Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries; if (entries0 == null) { @@ -93,7 +101,7 @@ public class GridCacheAccessExpiryPolicy { /** * @return TTL update request. */ - @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { + @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { return entries; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index a4ec90f..46029e7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -1765,7 +1765,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im final String taskName, final boolean deserializePortable, final boolean forcePrimary, - @Nullable GridCacheAccessExpiryPolicy expiry, + @Nullable GridCacheExpiryPolicy expiry, @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter ) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index c6e3ea6..b684183 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -292,7 +292,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheAccessExpiryPolicy expiryPlc) + @Nullable GridCacheExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; /** @@ -425,7 +425,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - @Nullable ExpiryPolicy expiryPlc, + @Nullable GridCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java new file mode 100644 index 0000000..3ce9572 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java @@ -0,0 +1,47 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public interface GridCacheExpiryPolicy { + /** + * @return TTL. + */ + public abstract long forCreate(); + + /** + * @return TTL. + */ + public abstract long forUpdate(); + + /** + * @return TTL. + */ + public abstract long forAccess(); + + /** + * @param key Entry key. + * @param keyBytes Entry key bytes. + * @param ver Entry version. + */ + public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver); + + /** + * @return TTL update request. + */ + @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index c191dda..19d28b6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -700,7 +700,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheAccessExpiryPolicy expirePlc) + @Nullable GridCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { cctx.denyOnFlag(LOCAL); @@ -733,7 +733,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheAccessExpiryPolicy expiryPlc) + @Nullable GridCacheExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException { // Disable read-through if there is no store. if (readThrough && !cctx.isStoreEnabled()) @@ -882,13 +882,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } if (ret != null && expiryPlc != null) { - long ttl = expiryPlc.ttl(); + long ttl = expiryPlc.forAccess(); assert ttl >= 0 : ttl; updateTtl(ttl); - expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version()); + expiryPlc.onAccessUpdated(key(), getOrMarshalKeyBytes(), version()); } } @@ -1464,6 +1464,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> assert cctx.isLocal() && cctx.atomic(); V old; + boolean res = true; IgniteBiTuple<Boolean, ?> interceptorRes = null; @@ -1501,8 +1502,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (!F.isEmpty(filter)) { boolean pass = cctx.isAll(wrapFilterLocked(), filter); - if (!pass) + if (!pass) { + if (expiryPlc != null && hasValueUnlocked()) { + long ttl = toTtl(expiryPlc.getExpiryForAccess()); + + if (ttl != -1L) + updateTtl(ttl); + } + return new IgniteBiTuple<>(false, retval ? old : null); + } } // Apply metrics. @@ -1658,20 +1667,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** - * @param expiryPlc Expiry policy. - * @param isNew {@code True} if entry is new. - * @return TTL. - */ - private static long ttlFromPolicy(@Nullable ExpiryPolicy expiryPlc, boolean isNew) { - if (expiryPlc == null) - return -1L; - - Duration duration = isNew ? expiryPlc.getExpiryForCreation() : expiryPlc.getExpiryForUpdate(); - - return toTtl(duration); - } - - /** * @param duration Duration. * @return TTL. */ @@ -1703,7 +1698,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - @Nullable ExpiryPolicy expiryPlc, + @Nullable GridCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, @@ -1750,7 +1745,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> op, writeObj, valBytes, - ttlFromPolicy(expiryPlc, isNew()), + expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L, drTtl, drExpireTime, drVer); @@ -1844,18 +1839,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (!pass) { if (hasValueUnlocked() && expiryPlc != null) { - Duration duration = expiryPlc.getExpiryForAccess(); + newTtl = expiryPlc.forAccess(); - newTtl = toTtl(duration); - - if (newTtl != -1L) + if (newTtl != -1L) { updateTtl(newTtl); + + expiryPlc.onAccessUpdated(key, getOrMarshalKeyBytes(), version()); + } } return new GridCacheUpdateAtomicResult<>(false, retval ? old : null, null, - newTtl, + 0L, -1L, null, null, @@ -1929,25 +1925,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> else { assert drExpireTime == -1L; - if (expiryPlc != null) { - if (!hadVal) { - Duration duration = expiryPlc.getExpiryForCreation(); - - if (duration != null && duration.isZero()) - return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, - null, - 0L, - -1L, - null, - null, - false); - - newTtl = toTtl(duration); - } - else - newTtl = toTtl(expiryPlc.getExpiryForUpdate()); - } + if (expiryPlc != null) + newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); else newTtl = -1L; @@ -3428,6 +3407,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> /** {@inheritDoc} */ @Override public void updateTtl(GridCacheVersion ver, long ttl) { synchronized (this) { + updateTtl(ttl); + + /* + TODO IGNITE-41. try { if (ver.equals(version())) updateTtl(ttl); @@ -3435,6 +3418,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> catch (GridCacheEntryRemovedException ignored) { // No-op. } + */ } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java index df888e7..0bff22b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java @@ -836,7 +836,7 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab out.writeBoolean(grpLock); CU.writeVersion(out, drVer); - out.writeObject(transferExpiryPlc ? new GridCacheExpiryPolicy(expiryPlc) : null); + out.writeObject(transferExpiryPlc ? new GridCacheExternalizableExpiryPolicy(expiryPlc) : null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java index d28f728..77f6a7a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java @@ -1586,7 +1586,7 @@ public class GridCacheUtils { * @return Expire time. */ public static long toExpireTime(long ttl) { - assert ttl >= 0L; + assert ttl >= 0L : ttl; if (ttl == 0L) return 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java deleted file mode 100644 index 3a77884..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java +++ /dev/null @@ -1,152 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import javax.cache.expiry.*; -import java.io.*; -import java.util.concurrent.*; - -/** - * - */ -public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { - /** */ - private ExpiryPolicy plc; - - /** */ - private static final byte CREATE_TTL_MASK = 0x01; - - /** */ - private static final byte UPDATE_TTL_MASK = 0x02; - - /** */ - private static final byte ACCESS_TTL_MASK = 0x04; - - /** */ - private Duration forCreate; - - /** */ - private Duration forUpdate; - - /** */ - private Duration forAccess; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheExpiryPolicy() { - // No-op. - } - - /** - * @param plc Expiry policy. - */ - public GridCacheExpiryPolicy(ExpiryPolicy plc) { - assert plc != null; - - this.plc = plc; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForCreation() { - return forCreate; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForAccess() { - return forAccess; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForUpdate() { - return forUpdate; - } - - /** - * @param out Output stream. - * @param duration Duration. - * @throws IOException - */ - private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { - if (duration != null) { - if (duration.isEternal()) - out.writeLong(0); - else if (duration.getDurationAmount() == 0) - out.writeLong(1); - else - out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); - } - } - - /** - * @param in Input stream. - * @return Duration. - * @throws IOException - */ - private Duration readDuration(ObjectInput in) throws IOException { - long ttl = in.readLong(); - - assert ttl >= 0; - - if (ttl == 0) - return Duration.ETERNAL; - - return new Duration(TimeUnit.MILLISECONDS, ttl); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - byte flags = 0; - - Duration create = plc.getExpiryForCreation(); - - if (create != null) - flags |= CREATE_TTL_MASK; - - Duration update = plc.getExpiryForUpdate(); - - if (update != null) - flags |= UPDATE_TTL_MASK; - - Duration access = plc.getExpiryForAccess(); - - if (access != null) - flags |= ACCESS_TTL_MASK; - - out.writeByte(flags); - - writeDuration(out, create); - - writeDuration(out, update); - - writeDuration(out, access); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - byte flags = in.readByte(); - - if ((flags & CREATE_TTL_MASK) != 0) - forCreate = readDuration(in); - - if ((flags & UPDATE_TTL_MASK) != 0) - forUpdate = readDuration(in); - - if ((flags & ACCESS_TTL_MASK) != 0) - forAccess = readDuration(in); - } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheExpiryPolicy.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java new file mode 100644 index 0000000..75da5de --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java @@ -0,0 +1,153 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.distributed; + +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.expiry.*; +import java.io.*; +import java.util.concurrent.*; + +/** + * + */ +public class GridCacheExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable { + /** */ + private ExpiryPolicy plc; + + /** */ + private static final byte CREATE_TTL_MASK = 0x01; + + /** */ + private static final byte UPDATE_TTL_MASK = 0x02; + + /** */ + private static final byte ACCESS_TTL_MASK = 0x04; + + /** */ + private Duration forCreate; + + /** */ + private Duration forUpdate; + + /** */ + private Duration forAccess; + + /** + * Required by {@link Externalizable}. + */ + public GridCacheExternalizableExpiryPolicy() { + // No-op. + } + + /** + * @param plc Expiry policy. + */ + public GridCacheExternalizableExpiryPolicy(ExpiryPolicy plc) { + assert plc != null; + + this.plc = plc; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + return forCreate; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + return forAccess; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + return forUpdate; + } + + /** + * @param out Output stream. + * @param duration Duration. + * @throws IOException If failed. + */ + private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { + if (duration != null) { + if (duration.isEternal()) + out.writeLong(0); + else if (duration.getDurationAmount() == 0) + out.writeLong(1); + else + out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); + } + } + + /** + * @param in Input stream. + * @return Duration. + * @throws IOException If failed. + */ + private Duration readDuration(ObjectInput in) throws IOException { + long ttl = in.readLong(); + + assert ttl >= 0; + + if (ttl == 0) + return Duration.ETERNAL; + + return new Duration(TimeUnit.MILLISECONDS, ttl); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + byte flags = 0; + + Duration create = plc.getExpiryForCreation(); + + if (create != null) + flags |= CREATE_TTL_MASK; + + Duration update = plc.getExpiryForUpdate(); + + if (update != null) + flags |= UPDATE_TTL_MASK; + + Duration access = plc.getExpiryForAccess(); + + if (access != null) + flags |= ACCESS_TTL_MASK; + + out.writeByte(flags); + + writeDuration(out, create); + + writeDuration(out, update); + + writeDuration(out, access); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + byte flags = in.readByte(); + + if ((flags & CREATE_TTL_MASK) != 0) + forCreate = readDuration(in); + + if ((flags & UPDATE_TTL_MASK) != 0) + forUpdate = readDuration(in); + + if ((flags & ACCESS_TTL_MASK) != 0) + forAccess = readDuration(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheExternalizableExpiryPolicy.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index fef3b82..a7b1fea 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -428,7 +428,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], GridCacheAccessExpiryPolicy)} + * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], org.gridgain.grid.kernal.processors.cache.GridCacheExpiryPolicy)} * method instead to retrieve DHT value. * * @param keys {@inheritDoc} @@ -483,7 +483,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheAccessExpiryPolicy expiry + @Nullable GridCacheExpiryPolicy expiry ) { return getAllAsync(keys, null, @@ -518,7 +518,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap int taskNameHash, boolean deserializePortable, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheAccessExpiryPolicy expiry) { + @Nullable GridCacheExpiryPolicy expiry) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, @@ -599,7 +599,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @param expiryPlc Expiry policy. */ - protected void sendTtlUpdateRequest(@Nullable final GridCacheAccessExpiryPolicy expiryPlc) { + public void sendTtlUpdateRequest(@Nullable final GridCacheExpiryPolicy expiryPlc) { if (expiryPlc != null && expiryPlc.entries() != null) { ctx.closures().runLocalSafe(new Runnable() { @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"}) @@ -622,7 +622,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); if (req == null) { - reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.ttl())); + reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.forAccess())); req.cacheId(ctx.cacheId()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java index 2ce4b6c..7a0db43 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -84,7 +84,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col private boolean deserializePortable; /** Expiry policy. */ - private GridCacheAccessExpiryPolicy expiryPlc; + private GridCacheExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -119,7 +119,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, - @Nullable GridCacheAccessExpiryPolicy expiryPlc) { + @Nullable GridCacheExpiryPolicy expiryPlc) { super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer()); assert reader != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 4a3491d..942d0c5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -89,7 +89,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private boolean deserializePortable; /** Expiry policy. */ - private GridCacheAccessExpiryPolicy expiryPlc; + private GridCacheExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -121,7 +121,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable GridCacheAccessExpiryPolicy expiryPlc + @Nullable GridCacheExpiryPolicy expiryPlc ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -363,7 +363,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M filters, subjId, taskName == null ? 0 : taskName.hashCode(), - expiryPlc != null ? expiryPlc.ttl() : -1L); + expiryPlc != null ? expiryPlc.forAccess() : -1L); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 3b07fc0..c3ef323 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -16,7 +16,6 @@ import org.apache.ignite.plugin.security.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; @@ -893,6 +892,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + GridCacheExpiryPolicy expiry = null; + try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. @@ -941,17 +942,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate = ctx.isDrEnabled(); + expiry = expiryPolicy(req.expiry() != null ? req.expiry() : ctx.expiry()); + if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) { // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, req, res, locked, ver, - dhtFut, completionCb, replicate, taskName); + UpdateBatchResult<K, V> updRes = updateWithBatch(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + replicate, + taskName, + expiry); deleted = updRes.deleted(); dhtFut = updRes.dhtFuture(); } else { - UpdateSingleResult<K, V> updRes = updateSingle(node, hasNear, req, res, locked, ver, - dhtFut, completionCb, replicate, taskName); + UpdateSingleResult<K, V> updRes = updateSingle(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + completionCb, + replicate, + taskName, + expiry); retVal = updRes.returnValue(); deleted = updRes.deleted(); @@ -1013,6 +1034,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else completionCb.apply(req, res); } + + sendTtlUpdateRequest(expiry); } /** @@ -1027,6 +1050,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param dhtFut Optional DHT future. * @param completionCb Completion callback to invoke when DHT future is completed. * @param replicate Whether replication is enabled. + * @param taskName Task name. + * @param expiry Expiry policy. * @return Deleted entries. * @throws GridCacheEntryRemovedException Should not be thrown. */ @@ -1041,7 +1066,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut, CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, boolean replicate, - String taskName + String taskName, + @Nullable GridCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue(); // Should not request return values for putAll. @@ -1078,7 +1104,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (!checkFilter(entry, req, res)) { - // TODO IGNITE-41 update TTL. + if (expiry != null && entry.hasValue()) { + long ttl = expiry.forAccess(); + + if (ttl != -1L) { + entry.updateTtl(null, ttl); + + expiry.onAccessUpdated(entry.key(), entry.getOrMarshalKeyBytes(), entry.version()); + } + } if (log.isDebugEnabled()) log.debug("Entry did not pass the filter (will skip write) [entry=" + entry + @@ -1143,7 +1177,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res, replicate, updRes, - taskName); + taskName, + expiry); firstEntryIdx = i + 1; @@ -1184,7 +1219,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res, replicate, updRes, - taskName); + taskName, + expiry); firstEntryIdx = i + 1; @@ -1293,7 +1329,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res, replicate, updRes, - taskName); + taskName, + expiry); } else assert filtered.isEmpty(); @@ -1366,6 +1403,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param completionCb Completion callback to invoke when DHT future is completed. * @param replicate Whether DR is enabled for that cache. * @param taskName Task name. + * @param expiry Expiry policy. * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ @@ -1379,7 +1417,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut, CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb, boolean replicate, - String taskName + String taskName, + @Nullable GridCacheExpiryPolicy expiry ) throws GridCacheEntryRemovedException { GridCacheReturn<Object> retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null; @@ -1394,8 +1433,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); - // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { K k = keys.get(i); @@ -1505,8 +1542,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expireTime); } else { - // TODO IGNITE-41 ttl could be changed. - if (log.isDebugEnabled()) log.debug("Entry did not pass the filter or conflict resolution (will skip write) " + "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); @@ -1610,7 +1645,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicUpdateResponse<K, V> res, boolean replicate, UpdateBatchResult<K, V> batchRes, - String taskName + String taskName, + @Nullable GridCacheExpiryPolicy expiry ) { assert putMap == null ^ rmvKeys == null; @@ -1658,8 +1694,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); - // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry<K, V> entry = entries.get(i); @@ -1703,7 +1737,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, primary, ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. - req.filter(), // TODO IGNITE-41 filter already evaluated? + null, replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, -1L, -1L, @@ -2375,6 +2409,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } + /** + * @param plc Expiry policy. + * @return Expiry policy wrapper. + */ + static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { + return plc == null ? null : new UpdateExpiryPolicy(plc); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicCache.class, this, super.toString()); @@ -2652,4 +2694,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { pendingResponses.remove(nodeId, this); } } + + /** + * + */ + private static class UpdateExpiryPolicy implements GridCacheExpiryPolicy { + /** */ + private final ExpiryPolicy plc; + + /** */ + private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; + + /** + * @param plc Expiry policy. + */ + private UpdateExpiryPolicy(ExpiryPolicy plc) { + assert plc != null; + + this.plc = plc; + } + + /** {@inheritDoc} */ + @Override public long forCreate() { + return GridCacheMapEntry.toTtl(plc.getExpiryForCreation()); + } + + /** {@inheritDoc} */ + @Override public long forUpdate() { + return GridCacheMapEntry.toTtl(plc.getExpiryForUpdate()); + } + + /** {@inheritDoc} */ + @Override public long forAccess() { + return GridCacheMapEntry.toTtl(plc.getExpiryForAccess()); + } + + /** {@inheritDoc} */ + @Override public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) { + if (entries == null) + entries = new HashMap<>(); + + entries.put(key, new IgniteBiTuple<>(keyBytes, ver)); + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { + return entries; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 3eca7e2..1265edb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -492,7 +492,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im filterBytes = marshalFilter(filter, ctx); if (expiryPlc != null) - expiryPlcBytes = CU.marshal(ctx, new GridCacheExpiryPolicy(expiryPlc)); + expiryPlcBytes = CU.marshal(ctx, new GridCacheExternalizableExpiryPolicy(expiryPlc)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index 1da6626..5c89c3f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -23,7 +23,6 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; -import javax.cache.expiry.*; import java.io.*; import java.util.*; @@ -363,9 +362,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - subjId = ctx.subjectIdPerCall(subjId); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); - return loadAsync(null, keys, false, forcePrimary, filter, subjId, taskName, deserializePortable); + subjId = ctx.subjectIdPerCall(subjId, prj); + + return loadAsync(null, + keys, + false, + forcePrimary, + filter, + subjId, + taskName, + deserializePortable, + prj != null ? prj.expiry() : null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java index 6c2fa8c..6f71d3c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -21,6 +21,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; @@ -164,7 +165,15 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Override public IgniteFuture<Object> readThroughAllAsync(Collection<? extends K> keys, boolean reload, GridCacheTxEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName, IgniteBiInClosure<K, V> vis) { - return (IgniteFuture)loadAsync(tx, keys, reload, false, filter, subjId, taskName, true); + return (IgniteFuture)loadAsync(tx, + keys, + reload, + false, + filter, + subjId, + taskName, + true, + null); } /** {@inheritDoc} */ @@ -246,6 +255,10 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param reload Reload flag. * @param forcePrimary Force primary flag. * @param filter Filter. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. * @return Loaded values. */ public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx, @@ -255,7 +268,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName, - boolean deserializePortable) { + boolean deserializePortable, + @Nullable ExpiryPolicy expiryPlc) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -264,7 +278,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null; - // TODO IGNITE-41. + final GridCacheAccessExpiryPolicy expiry = + GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, @@ -275,7 +290,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda subjId, taskName, deserializePortable, - null); + expiry); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java index 7759f49..967496b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -312,8 +312,15 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> { /** {@inheritDoc} */ @Override protected V readThrough(GridCacheTxEx<K, V> tx, K key, boolean reload, IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException { - return cctx.near().loadAsync(tx, F.asList(key), reload, /*force primary*/false, filter, subjId, taskName, true). - get().get(key); + return cctx.near().loadAsync(tx, + F.asList(key), + reload, + /*force primary*/false, + filter, + subjId, + taskName, + true, + null).get().get(key); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index b5d5e29..ea8450a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -91,7 +91,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private boolean deserializePortable; /** Expiry policy. */ - private GridCacheAccessExpiryPolicy expiryPlc; + private GridCacheExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -123,7 +123,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable GridCacheAccessExpiryPolicy expiryPlc + @Nullable GridCacheExpiryPolicy expiryPlc ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -238,6 +238,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (trackable) cctx.mvcc().removeFuture(this); + cache().dht().sendTtlUpdateRequest(expiryPlc); + return true; } @@ -366,7 +368,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma filters, subjId, taskName == null ? 0 : taskName.hashCode(), - expiryPlc != null ? expiryPlc.ttl() : -1L); + expiryPlc != null ? expiryPlc.forAccess() : -1L); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java index c8d90e2..416d193 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -108,9 +108,19 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> }); } - subjId = ctx.subjectIdPerCall(subjId); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); - return loadAsync(null, keys, false, forcePrimary, filter, subjId, taskName, deserializePortable); + subjId = ctx.subjectIdPerCall(subjId, prj); + + return loadAsync(null, + keys, + false, + forcePrimary, + filter, + subjId, + taskName, + deserializePortable, + prj != null ? prj.expiry() : null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index d901311..6f2b4e4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -257,7 +257,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (GridCacheReturn<V>)updateAllInternal(DELETE, Collections.singleton(key), null, - null, + expiryPerCall(), true, true, ctx.equalsPeekArray(val), @@ -384,7 +384,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (V)updateAllInternal(DELETE, Collections.singleton(key), null, - null, + expiryPerCall(), true, false, filter, @@ -410,7 +410,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updateAllInternal(DELETE, keys, null, - null, + expiryPerCall(), false, false, filter, @@ -437,7 +437,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, - null, + expiryPerCall(), false, false, filter, @@ -465,7 +465,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, - null, + expiryPerCall(), false, false, ctx.equalsPeekArray(val), @@ -686,10 +686,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE; + final Collection<? extends K> keys = map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null; + final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null; + final boolean storeEnabled = ctx.isStoreEnabled(); + final ExpiryPolicy expiry = expiryPerCall(); return asyncOp(new Callable<Object>() { @@ -723,12 +727,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ) { final boolean storeEnabled = ctx.isStoreEnabled(); + final ExpiryPolicy expiryPlc = expiryPerCall(); + return asyncOp(new Callable<Object>() { @Override public Object call() throws Exception { return updateAllInternal(DELETE, keys, null, - null, + expiryPlc, retval, rawRetval, filter, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java index b2ae55b..b238600 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java @@ -45,7 +45,7 @@ public class GridTcpCommunicationMessageFactory { private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>(); /** */ - public static final int MAX_COMMON_TYPE = 81; + public static final int MAX_COMMON_TYPE = 100; static { registerCommon(new GridTcpCommunicationMessageProducer() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 6c17203..326b232 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -142,6 +142,12 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } accessGetAll(); + + for (final Integer key : keys()) { + log.info("Test filter access [key=" + key + ']'); + + filterAccessRemove(key); + } } /** @@ -155,7 +161,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 60_000L); - assertEquals((Integer)1, cache.get(key)); + assertEquals((Integer) 1, cache.get(key)); checkTtl(key, 62_000L, true); @@ -167,6 +173,22 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } /** + * @param key Key. + * @throws Exception If failed. + */ + private void filterAccessRemove(Integer key) throws Exception { + IgniteCache<Integer, Integer> cache = jcache(); + + cache.put(key, 1); + + checkTtl(key, 60_000L); + + assertFalse(cache.remove(key, 2)); + + checkTtl(key, 62_000L, true); + } + + /** * @throws Exception If failed. */ private void accessGetAll() throws Exception { @@ -179,10 +201,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs cache.removeAll(vals.keySet()); - for (Map.Entry<Integer, Integer> e : vals.entrySet()) - cache.put(e.getKey(), e.getValue()); - - //cache.putAll(vals); + cache.putAll(vals); for (Integer key : vals.keySet()) checkTtl(key, 60_000L); @@ -456,41 +475,52 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs if (cacheMode() != PARTITIONED) return; - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - nearCache = true; - startGrids(); + testCreateUpdate(); + + nearReaderUpdate(); - Integer key = nearKey(cache(0)); + nearPutAll(); + } + + /** + * @throws Exception If failed. + */ + private void nearReaderUpdate() throws Exception { + log.info("Test near reader update."); + + Integer key = nearKeys(cache(0), 1, 500_000).get(0); - IgniteCache<Integer, Integer> jcache0 = jcache(0); + IgniteCache<Integer, Integer> cache0 = jcache(0); - jcache0.put(key, 1); + assertEquals(NEAR_PARTITIONED, cache(0).configuration().getDistributionMode()); + + cache0.put(key, 1); checkTtl(key, 60_000L); - IgniteCache<Integer, Integer> jcache1 = jcache(1); + IgniteCache<Integer, Integer> cache1 = jcache(1); // Update from another node. - jcache1.put(key, 2); + cache1.put(key, 2); checkTtl(key, 61_000L); // Update from another node with provided TTL. - jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3); + cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3); checkTtl(key, 1000L); waitExpired(key); // Try create again. - jcache0.put(key, 1); + cache0.put(key, 1); checkTtl(key, 60_000L); // Update from near node with provided TTL. - jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2); + cache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2); checkTtl(key, 1100L); @@ -500,38 +530,31 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs /** * @throws Exception If failed. */ - public void testNearPutAll() throws Exception { - if (cacheMode() != PARTITIONED) - return; - - factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); - - nearCache = true; - - startGrids(); - + private void nearPutAll() throws Exception { Map<Integer, Integer> vals = new HashMap<>(); for (int i = 0; i < 1000; i++) vals.put(i, i); - IgniteCache<Integer, Integer> jcache0 = jcache(0); + IgniteCache<Integer, Integer> cache0 = jcache(0); - jcache0.putAll(vals); + cache0.removeAll(vals.keySet()); + + cache0.putAll(vals); for (Integer key : vals.keySet()) checkTtl(key, 60_000L); - IgniteCache<Integer, Integer> jcache1 = jcache(1); + IgniteCache<Integer, Integer> cache1 = jcache(1); // Update from another node. - jcache1.putAll(vals); + cache1.putAll(vals); for (Integer key : vals.keySet()) checkTtl(key, 61_000L); // Update from another node with provided TTL. - jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals); + cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals); for (Integer key : vals.keySet()) checkTtl(key, 1000L); @@ -539,10 +562,10 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs waitExpired(vals.keySet()); // Try create again. - jcache0.putAll(vals); + cache0.putAll(vals); // Update from near node with provided TTL. - jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals); + cache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals); for (Integer key : vals.keySet()) checkTtl(key, 1101L); @@ -551,6 +574,36 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs } /** + * @throws Exception If failed. + */ + public void testNearAccess() throws Exception { + if (cacheMode() != PARTITIONED) + return; + + nearCache = true; + + testAccess(); + + Integer key = primaryKeys(cache(0), 1, 500_000).get(0); + + IgniteCache<Integer, Integer> cache0 = jcache(0); + + cache0.put(key, 1); + + checkTtl(key, 60_000L); + + assertEquals(1, jcache(1).get(key)); + + checkTtl(key, 62_000L, true); + + assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key)); + + checkTtl(key, 1000L, true); + + waitExpired(key); + } + + /** * @return Test keys. * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java deleted file mode 100644 index a6b3f8f..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ -* __ ____/___________(_)______ /__ ____/______ ____(_)_______ -* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ -* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / -* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ -*/ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.store.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.testframework.*; - -/** - * Entry time-to-live abstract test. - */ -public abstract class GridCacheAbstractTtlSelfTest extends GridCacheAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override protected GridCacheStore<?, ?> cacheStore() { - return null; - } - - /** - * @throws Exception If failed. - */ - public void testGetExpired() throws Exception { - final GridCache<String, Integer> c = cache(); - - final String key = "1"; - - int ttl = 500; - - GridCacheEntry<String, Integer> entry = c.entry(key); - - entry.timeToLive(ttl); - - entry.setValue(1); - - checkKeyIsRetired(key, ttl); - } - - /** - * @throws Exception If failed. - */ - public void testGetExpiredTx() throws Exception { - GridCache<String, Integer> c = cache(); - - String key = "1"; - int ttl = 500; - - try (GridCacheTx tx = c.txStart()) { - GridCacheEntry<String, Integer> entry = c.entry(key); - - entry.timeToLive(ttl); - - entry.setValue(1); - - tx.commit(); - } - - checkKeyIsRetired(key, ttl); - } - - /** - * Checks if the given cache has entry with the given key with some timeout based on the given TTL. - * - * @param key Key to be checked. - * @param ttl Base value for timeout before checking starts. - * @throws Exception If failed - */ - private void checkKeyIsRetired(final String key, int ttl) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { - @Override public boolean applyx() throws IgniteCheckedException { - for (int i = 0; i < gridCount(); i++) { - if (cache(i).get(key) != null) { - info("Key is still in cache of grid " + i); - - return false; - } - } - - return true; - } - }, ttl * 4)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java index 6a21fe7..a7b970d 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java @@ -403,7 +403,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme Object transformClo, String taskName, IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable GridCacheAccessExpiryPolicy expiryPlc) { + @Nullable GridCacheExpiryPolicy expiryPlc) { return val; } @@ -427,7 +427,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme @Nullable Object writeObj, boolean writeThrough, boolean retval, - ExpiryPolicy expiryPlc, + @Nullable ExpiryPolicy expiryPlc, boolean evt, boolean metrics, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, @@ -448,7 +448,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme @Nullable byte[] valBytes, boolean writeThrough, boolean retval, - ExpiryPolicy expiryPlc, + @Nullable GridCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java deleted file mode 100644 index d74c04e..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java +++ /dev/null @@ -1,25 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ -* __ ____/___________(_)______ /__ ____/______ ____(_)_______ -* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ -* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / -* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ -*/ - -package org.gridgain.grid.kernal.processors.cache.distributed.near; - -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.cache.*; - -import static org.gridgain.grid.cache.GridCacheMode.*; - -/** - * Entry time-to-live test for partitioned cache. - */ -public class GridCachePartitionedTtlSelfTest extends GridCacheAbstractTtlSelfTest { - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return PARTITIONED; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java deleted file mode 100644 index 1c71c4a..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java +++ /dev/null @@ -1,24 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ -* __ ____/___________(_)______ /__ ____/______ ____(_)_______ -* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ -* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / -* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ -*/ - -package org.gridgain.grid.kernal.processors.cache.distributed.replicated; - -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.cache.*; -import static org.gridgain.grid.cache.GridCacheMode.*; - -/** - * Entry time-to-live test for replicated cache. - */ -public class GridCacheReplicatedTtlSelfTest extends GridCacheAbstractTtlSelfTest { - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return REPLICATED; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java deleted file mode 100644 index 9c599c1..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java +++ /dev/null @@ -1,25 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ -* __ ____/___________(_)______ /__ ____/______ ____(_)_______ -* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ -* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / -* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ -*/ - -package org.gridgain.grid.kernal.processors.cache.local; - -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.cache.*; - -import static org.gridgain.grid.cache.GridCacheMode.*; - -/** - * Entry time-to-live test for local cache. - */ -public class GridCacheLocalTtlSelfTest extends GridCacheAbstractTtlSelfTest { - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return LOCAL; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java index e958f77..94dd70b 100644 --- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java +++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java @@ -149,7 +149,6 @@ public class GridDataGridTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCachePartitionedTxSingleThreadedSelfTest.class)); suite.addTest(new TestSuite(GridCacheColocatedTxSingleThreadedSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedTxTimeoutSelfTest.class)); - suite.addTest(new TestSuite(GridCachePartitionedTtlSelfTest.class)); suite.addTest(new TestSuite(GridCacheFinishPartitionsSelfTest.class)); suite.addTest(new TestSuite(GridCacheDhtEntrySelfTest.class)); suite.addTest(new TestSuite(GridCacheDhtInternalEntrySelfTest.class)); @@ -242,7 +241,6 @@ public class GridDataGridTestSuite extends TestSuite { suite.addTestSuite(GridCacheReplicatedEvictionEventSelfTest.class); // TODO: GG-7569. // suite.addTestSuite(GridCacheReplicatedTxMultiThreadedSelfTest.class); - suite.addTestSuite(GridCacheReplicatedTtlSelfTest.class); suite.addTestSuite(GridCacheReplicatedPreloadEventsSelfTest.class); suite.addTestSuite(GridCacheReplicatedPreloadStartStopEventsSelfTest.class); // TODO: GG-7434 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java index 2883215..94bd4fb 100644 --- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java @@ -32,6 +32,7 @@ import org.gridgain.testframework.junits.common.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -286,17 +287,10 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT * @throws Exception If failed. */ public void testExpiration() throws Exception { - GridCache<String, Integer> cache = ignite.cache(null); - - GridCacheEntry<String, Integer> entry = cache.entry("key1"); - - assert entry != null; + ignite.jcache(null). + withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 1000))).put("key1", 1); - entry.timeToLive(1000); - - entry.set(1); - - assert entry.isCached(); + GridCache<String, Integer> cache = ignite.cache(null); GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "1=1");
