Repository: incubator-ignite Updated Branches: refs/heads/ignite-41 764297932 -> 18b5b5a93
# ignite-41 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/18b5b5a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/18b5b5a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/18b5b5a9 Branch: refs/heads/ignite-41 Commit: 18b5b5a93159e2a45e558083f7a975019d71fc87 Parents: 7642979 Author: sboikov <[email protected]> Authored: Thu Dec 18 07:27:44 2014 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 18 07:27:44 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 6 ++++ .../processors/cache/GridCacheMapEntry.java | 13 +++++++++ .../distributed/GridCacheTtlUpdateRequest.java | 27 ++++++++++++++++++ .../distributed/dht/GridDhtCacheAdapter.java | 22 ++++++++++++++- .../dht/GridPartitionedGetFuture.java | 12 +++++--- .../dht/atomic/GridDhtAtomicCache.java | 10 ++++--- .../dht/colocated/GridDhtColocatedCache.java | 3 +- .../distributed/near/GridNearGetFuture.java | 3 +- .../distributed/near/GridNearGetRequest.java | 21 +++++++++++++- .../IgniteCacheExpiryPolicyAbstractTest.java | 29 +++++++++++++++++++- .../processors/cache/GridCacheTestEntryEx.java | 5 ++++ 11 files changed, 138 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java index 2b38247..c6e3ea6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java @@ -873,6 +873,12 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware { public long ttl() throws GridCacheEntryRemovedException; /** + * @param ver Version. + * @param ttl Time to live. + */ + public void updateTtl(GridCacheVersion ver, long ttl); + + /** * @return Value. * @throws IgniteCheckedException If failed to read from swap storage. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 00f7382..e4ccc11 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -3428,6 +3428,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } /** {@inheritDoc} */ + @Override public void updateTtl(GridCacheVersion ver, long ttl) { + synchronized (this) { + try { + if (ver.equals(version())) + updateTtl(ttl); + } + catch (GridCacheEntryRemovedException ignored) { + // No-op. + } + } + } + + /** {@inheritDoc} */ @Override public synchronized void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException { checkObsolete(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index 71e314e..c862904 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -56,6 +56,13 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { } /** + * @return TTL. + */ + public long ttl() { + return ttl; + } + + /** * @param key Key. * @param keyBytes Key bytes. * @param ver Version. @@ -83,6 +90,26 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { return keys; } + /** + * @param idx Entry index. + * @return Key. + */ + public K key(int idx) { + assert idx >= 0 && idx < keys.size() : idx; + + return keys.get(idx); + } + + /** + * @param idx Entry index. + * @return Version. + */ + public GridCacheVersion version(int idx) { + assert idx >= 0 && idx < vers.size() : idx; + + return vers.get(idx); + } + /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 3557d17..328a8d5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -106,7 +106,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param req Request. */ private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { - log.info("Ttl update: " + req); + int size = req.keys().size(); + + for (int i = 0; i < size; i++) { + try { + GridCacheEntryEx<K, V> entry; + + if (ctx.isSwapOrOffheapEnabled()) { + entry = ctx.cache().entryEx(req.key(i), true); + + entry.unswap(true, false); + } + else + entry = ctx.cache().peekEx(req.key(i)); + + if (entry != null) + entry.updateTtl(req.version(i), req.ttl()); + } + catch (IgniteCheckedException e) { + log.error("Failed to unswap entry.", e); + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 5f6af05..34539d4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; @@ -89,6 +88,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Whether to deserialize portable objects. */ private boolean deserializePortable; + /** */ + private GridCacheAccessExpiryPolicy expiry; + /** * Empty constructor required for {@link Externalizable}. */ @@ -114,11 +116,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters, @Nullable UUID subjId, String taskName, - boolean deserializePortable + boolean deserializePortable, + @Nullable GridCacheAccessExpiryPolicy expiry ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); - assert cctx != null; assert !F.isEmpty(keys); this.cctx = cctx; @@ -130,6 +132,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M this.subjId = subjId; this.deserializePortable = deserializePortable; this.taskName = taskName; + this.expiry = expiry; futId = IgniteUuid.randomUuid(); @@ -352,7 +355,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M topVer, filters, subjId, - taskName == null ? 0 : taskName.hashCode()); + taskName == null ? 0 : taskName.hashCode(), + expiry != null ? expiry.ttl() : -1L); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6287e16..1d49097 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -729,15 +729,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long topVer = ctx.affinity().affinityTopologyVersion(); + final GridCacheAccessExpiryPolicy expiry = + GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f); boolean success = true; - final GridCacheAccessExpiryPolicy expiry = - GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); - // Optimistically expect that all keys are available locally (avoid creation of get future). for (K key : keys) { GridCacheEntryEx<K, V> entry = null; @@ -819,6 +819,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { GridCacheTtlUpdateRequest<K, V> req = expiry.request(); + assert req != null; assert !F.isEmpty(req.keys()); Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1); @@ -847,7 +848,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filter, subjId, taskName, - deserializePortable); + deserializePortable, + expiry); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 1052e1d..84f3165 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -319,7 +319,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte filter, subjId, taskName, - deserializePortable); + deserializePortable, + null); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index 1f1de06..d23236a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -350,7 +350,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma topVer, filters, subjId, - taskName == null ? 0 : taskName.hashCode()); + taskName == null ? 0 : taskName.hashCode(), + -1L); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java index 12aacb7..552012b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java @@ -71,6 +71,9 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements @GridDirectVersion(2) private int taskNameHash; + /** */ + private long accessTtl; + /** * Empty constructor required for {@link Externalizable}. */ @@ -98,7 +101,8 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements long topVer, IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, - int taskNameHash + int taskNameHash, + long accessTtl ) { assert futId != null; assert miniId != null; @@ -115,6 +119,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.accessTtl = accessTtl; } /** @@ -364,6 +369,12 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements commState.idx++; + case 12: + if (!commState.putLong(accessTtl)) + return false; + + commState.idx++; + } return true; @@ -513,6 +524,14 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements commState.idx++; + case 12: + if (buf.remaining() < 8) + return false; + + accessTtl = commState.getLong(); + + commState.idx++; + } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 93b0405..5ab6033 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -16,6 +16,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.*; import org.jetbrains.annotations.*; @@ -150,7 +151,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs assertEquals((Integer)1, cache.get(key)); - checkTtl(key, 62_000L); + checkTtl(key, 62_000L, true); } /** @@ -578,6 +579,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs * @throws Exception If failed. */ private void checkTtl(Object key, long ttl) throws Exception { + checkTtl(key, ttl, false); + } + + /** + * @param key Key. + * @param ttl TTL. + * @throws Exception If failed. + */ + private void checkTtl(Object key, final long ttl, boolean wait) throws Exception { boolean found = false; for (int i = 0; i < gridCount(); i++) { @@ -595,6 +605,23 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs else { found = true; + if (wait) { + final GridCacheEntryEx<Object, Object> e0 = e; + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + try { + return e0.ttl() == ttl; + } + catch (Exception e) { + fail("Unexpected error: " + e); + + return true; + } + } + }, 3000); + } + assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl()); if (ttl > 0) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java index bbce8ca..6a21fe7 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java @@ -763,6 +763,11 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme return ttl; } + /** @inheritDoc */ + @Override public void updateTtl(GridCacheVersion ver, long ttl) { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @Override public V unswap() throws IgniteCheckedException { return null;
