# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/784fadaf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/784fadaf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/784fadaf Branch: refs/heads/ignite-1 Commit: 784fadaf7f09a9b90c29dc40fa8e4fadda57b554 Parents: 3e62879 Author: sboikov <[email protected]> Authored: Fri Dec 19 12:22:00 2014 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 19 13:02:03 2014 +0300 ---------------------------------------------------------------------- .../cache/GridCacheAccessExpiryPolicy.java | 46 ++-- .../distributed/GridCacheTtlUpdateRequest.java | 213 +++++++++++++++++-- .../distributed/dht/GridDhtCacheAdapter.java | 80 ++++++- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../IgniteCacheExpiryPolicyAbstractTest.java | 42 ++-- 5 files changed, 326 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java index 2205108..ebac13c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java @@ -25,7 +25,10 @@ public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy { private final long accessTtl; /** */ - private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; + private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; + + /** */ + private Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrsMap; /** * @param expiryPlc Expiry policy. @@ -70,11 +73,12 @@ public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy { /** * */ - public void reset() { - Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries; + public synchronized void reset() { + if (entries != null) + entries.clear(); - if (entries0 != null) - entries0.clear(); + if (rdrsMap != null) + rdrsMap.clear(); } /** @@ -83,34 +87,42 @@ public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy { * @param ver Entry version. */ @SuppressWarnings("unchecked") - @Override public void onAccessUpdated(Object key, + @Override public synchronized void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver, @Nullable Collection<UUID> rdrs) { - Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries; + if (entries == null) + entries = new HashMap<>(); + + IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver); + + entries.put(key, t); - if (entries0 == null) { - synchronized (this) { - entries0 = entries; + if (rdrs != null && !rdrs.isEmpty()) { + if (rdrsMap == null) + rdrsMap = new HashMap<>(); - if (entries0 == null) - entries0 = entries = new ConcurrentHashMap8<>(); + for (UUID nodeId : rdrs) { + Collection<IgniteBiTuple<byte[], GridCacheVersion>> col = rdrsMap.get(nodeId); + + if (col == null) + rdrsMap.put(nodeId, col = new ArrayList<>()); + + col.add(t); } } - - entries0.put(key, new IgniteBiTuple<>(keyBytes, ver)); } /** * @return TTL update request. */ - @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { + @Nullable @Override public synchronized Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { return entries; } /** {@inheritDoc} */ - @Nullable @Override public Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { - return null; + @Nullable @Override public synchronized Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> readers() { + return rdrsMap; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index b2e9141..ce597bd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -23,30 +23,38 @@ import java.util.*; * */ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { - /** */ - @GridDirectCollection(byte[].class) - private List<byte[]> keysBytes; - - /** Entry keys. */ + /** Entries keys. */ @GridToStringInclude @GridDirectTransient private List<K> keys; - /** Entry versions. */ + /** Keys bytes. */ + @GridDirectCollection(byte[].class) + private List<byte[]> keysBytes; + + /** Entries versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> vers; - /** */ + /** Near entries keys. */ + @GridToStringInclude + @GridDirectTransient + private List<K> nearKeys; + + /** Near entries bytes. */ @GridDirectCollection(byte[].class) private List<byte[]> nearKeysBytes; - /** Versions for near entries. */ + /** Near entries versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> nearVers; /** New TTL. */ private long ttl; + /** Topology version. */ + private long topVer; + /** * Required empty constructor. */ @@ -55,15 +63,24 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { } /** + * @param topVer Topology version. * @param ttl TTL. */ - public GridCacheTtlUpdateRequest(long ttl) { + public GridCacheTtlUpdateRequest(long topVer, long ttl) { assert ttl >= 0 : ttl; + this.topVer = topVer; this.ttl = ttl; } /** + * @return Topology version. + */ + public long topologyVersion() { + return topVer; + } + + /** * @return TTL. */ public long ttl() { @@ -87,6 +104,22 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { } /** + * @param keyBytes Key bytes. + * @param ver Version. + */ + public void addNearEntry(byte[] keyBytes, GridCacheVersion ver) { + if (nearKeysBytes == null) { + nearKeysBytes = new ArrayList<>(); + + nearVers = new ArrayList<>(); + } + + nearKeysBytes.add(keyBytes); + + nearVers.add(ver); + } + + /** * @return Keys. */ public List<K> keys() { @@ -94,13 +127,10 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { } /** - * @param idx Entry index. - * @return Key. + * @return Versions. */ - public K key(int idx) { - assert idx >= 0 && idx < keys.size() : idx; - - return keys.get(idx); + public List<GridCacheVersion > versions() { + return vers; } /** @@ -113,6 +143,20 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { return vers.get(idx); } + /** + * @return Keys for near cache. + */ + public List<K> nearKeys() { + return nearKeys; + } + + /** + * @return Versions for near cache entries. + */ + public List<GridCacheVersion > nearVersions() { + return nearVers; + } + /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { @@ -120,6 +164,9 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { if (keys == null && keysBytes != null) keys = unmarshalCollection(keysBytes, ctx, ldr); + + if (nearKeys == null && nearKeysBytes != null) + nearKeys = unmarshalCollection(nearKeysBytes, ctx, ldr); } /** {@inheritDoc} */ @@ -180,12 +227,72 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { commState.idx++; case 4: + if (nearKeysBytes != null) { + if (commState.it == null) { + if (!commState.putInt(nearKeysBytes.size())) + return false; + + commState.it = nearKeysBytes.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putByteArray((byte[])commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 5: + if (nearVers != null) { + if (commState.it == null) { + if (!commState.putInt(nearVers.size())) + return false; + + commState.it = nearVers.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 6: + if (!commState.putLong(topVer)) + return false; + + commState.idx++; + + case 7: if (!commState.putLong(ttl)) return false; commState.idx++; - case 5: + case 8: if (vers != null) { if (commState.it == null) { if (!commState.putInt(vers.size())) @@ -255,6 +362,72 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { commState.idx++; case 4: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (nearKeysBytes == null) + nearKeysBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + nearKeysBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 5: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (nearVers == null) + nearVers = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + GridCacheVersion _val = commState.getCacheVersion(); + + if (_val == CACHE_VER_NOT_READ) + return false; + + nearVers.add((GridCacheVersion)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 6: + if (buf.remaining() < 8) + return false; + + topVer = commState.getLong(); + + commState.idx++; + + case 7: if (buf.remaining() < 8) return false; @@ -262,7 +435,7 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { commState.idx++; - case 5: + case 8: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -302,10 +475,14 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> { GridCacheTtlUpdateRequest _clone = (GridCacheTtlUpdateRequest)_msg; - _clone.keysBytes = keysBytes; _clone.keys = keys; + _clone.keysBytes = keysBytes; _clone.vers = vers; + _clone.nearKeys = nearKeys; + _clone.nearKeysBytes = nearKeysBytes; + _clone.nearVers = nearVers; _clone.ttl = ttl; + _clone.topVer = topVer; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index e67c6dd..41e56c5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -622,7 +622,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); if (req == null) { - reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.forAccess())); + reqMap.put(node, + req = new GridCacheTtlUpdateRequest<>(topVer, expiryPlc.forAccess())); req.cacheId(ctx.cacheId()); } @@ -632,6 +633,30 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } } + Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrs = expiryPlc.readers(); + + if (rdrs != null) { + assert !rdrs.isEmpty(); + + for (Map.Entry<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> e : rdrs.entrySet()) { + ClusterNode node = ctx.node(e.getKey()); + + if (node != null) { + GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); + + if (req == null) { + reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(topVer, + expiryPlc.forAccess())); + + req.cacheId(ctx.cacheId()); + } + + for (IgniteBiTuple<byte[], GridCacheVersion> t : e.getValue()) + req.addNearEntry(t.get1(), t.get2()); + } + } + } + for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) { try { ctx.io().send(req.getKey(), req.getValue()); @@ -649,22 +674,55 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param req Request. */ private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { - int size = req.keys().size(); + if (req.keys() != null) + updateTtl(this, req.keys(), req.versions(), req.ttl()); + + if (req.nearKeys() != null) { + GridNearCacheAdapter<K, V> near = near(); + + assert near != null; + + updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl()); + } + } + + /** + * @param cache Cache. + * @param keys Entries keys. + * @param vers Entries versions. + * @param ttl TTL. + */ + private void updateTtl(GridCacheAdapter<K, V> cache, + List<K> keys, + List<GridCacheVersion> vers, + long ttl) { + assert !F.isEmpty(keys); + assert keys.size() == vers.size(); + + int size = keys.size(); + + boolean swap = cache.context().isSwapOrOffheapEnabled(); for (int i = 0; i < size; i++) { try { - GridCacheEntryEx<K, V> entry; + GridCacheEntryEx<K, V> entry = null; - if (ctx.isSwapOrOffheapEnabled()) { - entry = ctx.cache().entryEx(req.key(i), true); + try { + if (swap) { + entry = cache.entryEx(keys.get(i)); - entry.unswap(true, false); - } - else - entry = ctx.cache().peekEx(req.key(i)); + entry.unswap(true, false); + } + else + entry = cache.peekEx(keys.get(i)); - if (entry != null) - entry.updateTtl(req.version(i), req.ttl()); + if (entry != null) + entry.updateTtl(vers.get(i), ttl); + } + finally { + if (entry != null) + cache.context().evicts().touch(entry, -1L); + } } catch (IgniteCheckedException e) { log.error("Failed to unswap entry.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 21afa83..d8694c6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2744,12 +2744,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entries == null) entries = new HashMap<>(); - IgniteBiTuple t = new IgniteBiTuple<>(keyBytes, ver); + IgniteBiTuple<byte[], GridCacheVersion> t = new IgniteBiTuple<>(keyBytes, ver); entries.put(key, t); - if (!F.isEmpty(rdrs)) { - if (rdrs == null) + if (rdrs != null && !rdrs.isEmpty()) { + if (rdrsMap == null) rdrsMap = new HashMap<>(); for (UUID nodeId : rdrs) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/784fadaf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index bffe6af..2d115c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -45,6 +45,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs /** */ private Integer lastKey = 0; + /** */ + private static final long TTL_FOR_EXPIRE = 500L; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { // No-op. @@ -120,9 +123,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 60_000L); - cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 1); // Update with custom. + cache.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE, null)).put(key, 1); // Update with custom. - checkTtl(key, 1000L); + checkTtl(key, TTL_FOR_EXPIRE); waitExpired(key); } @@ -171,9 +174,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 62_000L, true); - assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key)); + assertEquals((Integer)1, cache.withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key)); - checkTtl(key, 1000L, true); + checkTtl(key, TTL_FOR_EXPIRE, true); waitExpired(key); } @@ -415,12 +418,12 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs tx = startTx(txConcurrency); // Update with provided TTL. - cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2); + cache.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE, null)).put(key, 2); if (tx != null) tx.commit(); - checkTtl(key, 1000L); + checkTtl(key, TTL_FOR_EXPIRE); waitExpired(key); @@ -538,9 +541,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 61_000L); // Update from another node with provided TTL. - cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3); + cache1.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE, null)).put(key, 3); - checkTtl(key, 1000L); + checkTtl(key, TTL_FOR_EXPIRE); waitExpired(key); @@ -550,9 +553,9 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 60_000L); // Update from near node with provided TTL. - cache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2); + cache0.withExpiryPolicy(new TestPolicy(null, TTL_FOR_EXPIRE + 1, null)).put(key, 2); - checkTtl(key, 1100L); + checkTtl(key, TTL_FOR_EXPIRE + 1); waitExpired(key); } @@ -626,11 +629,26 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs checkTtl(key, 62_000L, true); - assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key)); + assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, TTL_FOR_EXPIRE)).get(key)); - checkTtl(key, 1000L, true); + checkTtl(key, TTL_FOR_EXPIRE, true); waitExpired(key); + + // Test reader update on get. + + key = nearKeys(cache(0), 1, 600_000).get(0); + + cache0.put(key, 1); + + checkTtl(key, 60_000L); + + IgniteCache<Object, Object> cache = + cache(0).affinity().isPrimary(grid(1).localNode(), key) ? jcache(1) : jcache(2); + + assertEquals(1, cache.get(key)); + + checkTtl(key, 62_000L, true); } /**
