Repository: incubator-ignite Updated Branches: refs/heads/ignite-41 784fadaf7 -> eac503e7b
# ignite-41 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eac503e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eac503e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eac503e7 Branch: refs/heads/ignite-41 Commit: eac503e7b4e29d74addbdf31150ef6384360c28f Parents: 784fada Author: sboikov <[email protected]> Authored: Fri Dec 19 15:27:22 2014 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 19 16:08:06 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 1 + .../processors/cache/GridCacheMapEntry.java | 9 +++-- .../processors/cache/GridCacheProxyImpl.java | 11 +++++- .../cache/GridCacheTxLocalAdapter.java | 31 ++++++++------- .../dht/atomic/GridDhtAtomicCache.java | 6 +-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 4 ++ .../handlers/cache/GridCacheCommandHandler.java | 41 +++++++------------- .../cache/GridCacheTtlManagerLoadTest.java | 11 +++--- .../GridCacheBasicOpAbstractTest.java | 21 +++------- ...tomicClientOnlyMultiNodeFullApiSelfTest.java | 22 ++++------- ...eAtomicNearOnlyMultiNodeFullApiSelfTest.java | 12 ++---- .../GridCachePartitionedEvictionSelfTest.java | 22 +++++++---- .../hadoop/jobtracker/GridHadoopJobTracker.java | 37 +++++++++++++++--- 13 files changed, 124 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f1d27f0..01133b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -49,6 +49,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable private GridCacheProjectionImpl<K, V> prj; /** + * @param ctx Context. * @param delegate Delegate. * @param prj Projection. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index 8b79382..7944509 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1764,7 +1764,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (drRes.isUseOld()) { old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, old, null, 0L, -1L, null, null, false); + return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false); } newTtl = drRes.newTtl(); @@ -1930,13 +1930,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> if (drRes == null) { // Calculate TTL and expire time for local update. if (drTtl >= 0L) { - assert drExpireTime >= 0L; + assert drExpireTime >= 0L : drExpireTime; ttl0 = drTtl; newExpireTime = drExpireTime; } else { - assert drExpireTime == -1L; + assert drExpireTime == -1L : drExpireTime; if (expiryPlc != null) newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); @@ -1953,6 +1953,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> } } } + else if (newTtl == -1L) + ttl0 = ttlExtras(); // Try write-through. if (writeThrough) @@ -2496,6 +2498,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> GridCacheVersion ver) { assert ver != null; assert Thread.holdsLock(this); + assert ttl >= 0 : ttl; long oldExpireTime = expireTimeExtras(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java index 38cc2ca..3830679 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java @@ -1886,7 +1886,16 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) { - throw new UnsupportedOperationException(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); + + return new GridCacheProxyImpl<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index 1cafbed..3959419 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -652,20 +652,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K V val = res.get2(); byte[] valBytes = res.get3(); - if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) { - ExpiryPolicy expiry = txEntry.expiry(); - - if (expiry == null) - expiry = cacheCtx.expiry(); - - if (expiry != null) { - Duration duration = cached.hasValue() ? - expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); - - txEntry.ttl(GridCacheMapEntry.toTtl(duration)); - } - } - // Deal with DR conflicts. GridCacheVersion explicitVer = txEntry.drVersion() != null ? txEntry.drVersion() : writeVersion(); @@ -687,10 +673,25 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K if (drRes.isMerge()) explicitVer = writeVersion(); } - else + else { // Nullify explicit version so that innerSet/innerRemove will work as usual. explicitVer = null; + if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) { + ExpiryPolicy expiry = txEntry.expiry(); + + if (expiry == null) + expiry = cacheCtx.expiry(); + + if (expiry != null) { + Duration duration = cached.hasValue() ? + expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); + + txEntry.ttl(GridCacheMapEntry.toTtl(duration)); + } + } + } + if (sndTransformedVals || (drRes != null)) { assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d8694c6..334ecb0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1498,8 +1498,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.subjectId(), taskName); - assert updRes.newTtl() == -1L || (expiry != null || updRes.drExpireTime() >= 0); - if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -1630,6 +1628,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param res Response. * @param replicate Whether replication is enabled. * @param batchRes Batch update result. + * @param expiry Expiry policy. * @return Deleted entries. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -2361,6 +2360,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param res Dht atomic update response. */ + @SuppressWarnings("unchecked") private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse<K, V> res) { if (log.isDebugEnabled()) log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); @@ -2417,7 +2417,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param plc Expiry policy. * @return Expiry policy wrapper. */ - static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { + private static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { return plc == null ? null : new UpdateExpiryPolicy(plc); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index fda44c9..0d44e03 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -203,6 +203,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param keyBytes Key bytes, if key was already serialized. * @param val Value, {@code null} if should be removed. * @param valBytes Value bytes, {@code null} if should be removed. + * @param transformC Transform closure. * @param drTtl DR TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). @@ -267,6 +268,9 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp * @param keyBytes Key bytes, if key was already serialized. * @param val Value, {@code null} if should be removed. * @param valBytes Value bytes, {@code null} if should be removed. + * @param transformC Transform closure. + * @param ttl TTL. + * @param expireTime Expire time. */ public void addNearWriteValue(K key, @Nullable byte[] keyBytes, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 1a68cc7..6f34ebd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -28,6 +28,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -929,18 +930,12 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public IgniteFuture<?> applyx(GridCacheProjection<Object, Object> c, GridKernalContext ctx) { if (ttl != null) { - GridCacheEntry<Object, Object> entry = c.entry(key); + Duration duration = new Duration(TimeUnit.MILLISECONDS, ttl); - if (entry != null) { - entry.timeToLive(ttl); - - return entry.setxAsync(val); - } - else - return new GridFinishedFuture<Object>(ctx, false); + c = ((GridCacheProjectionEx<Object, Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration)); } - else - return c.putxAsync(key, val); + + return c.putxAsync(key, val); } } @@ -971,16 +966,13 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public IgniteFuture<?> applyx(GridCacheProjection<Object, Object> c, GridKernalContext ctx) { - GridCacheEntry<Object, Object> entry = c.entry(key); - - if (entry != null) { - if (ttl != null) - entry.timeToLive(ttl); + if (ttl != null) { + Duration duration = new Duration(TimeUnit.MILLISECONDS, ttl); - return entry.setxIfAbsentAsync(val); + c = ((GridCacheProjectionEx<Object, Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration)); } - else - return new GridFinishedFuture<Object>(ctx, false); + + return c.putxIfAbsentAsync(key, val); } } @@ -1011,16 +1003,13 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public IgniteFuture<?> applyx(GridCacheProjection<Object, Object> c, GridKernalContext ctx) { - GridCacheEntry<Object, Object> entry = c.entry(key); - - if (entry != null) { - if (ttl != null) - entry.timeToLive(ttl); + if (ttl != null) { + Duration duration = new Duration(TimeUnit.MILLISECONDS, ttl); - return entry.replacexAsync(val); + c = ((GridCacheProjectionEx<Object, Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration)); } - else - return new GridFinishedFuture<Object>(ctx, false); + + return c.replacexAsync(key, val); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java index 371ad91..ccc5a84 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java @@ -9,11 +9,12 @@ package org.gridgain.grid.kernal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.util.typedef.internal.*; +import javax.cache.expiry.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -36,15 +37,13 @@ public class GridCacheTtlManagerLoadTest extends GridCacheTtlManagerSelfTest { IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { - GridCache<Object,Object> cache = g.cache(null); + IgniteCache<Object,Object> cache = g.jcache(null). + withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 1000))); long key = 0; while (!stop.get()) { - GridCacheEntry<Object, Object> entry = cache.entry(key); - - entry.timeToLive(1000); - entry.setValue(key); + cache.put(key, key); key++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java index 53c6ff0..bd473eb 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java @@ -13,7 +13,6 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; @@ -21,6 +20,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.testframework.junits.common.*; +import javax.cache.expiry.*; import java.util.concurrent.*; import static java.util.concurrent.TimeUnit.*; @@ -313,26 +313,17 @@ public abstract class GridCacheBasicOpAbstractTest extends GridCommonAbstractTes * @throws Exception In case of error. */ public void testPutWithExpiration() throws Exception { - GridCache<String, String> cache1 = ignite1.cache(null); - GridCache<String, String> cache2 = ignite2.cache(null); - GridCache<String, String> cache3 = ignite3.cache(null); + IgniteCache<String, String> cache1 = ignite1.jcache(null); + IgniteCache<String, String> cache2 = ignite2.jcache(null); + IgniteCache<String, String> cache3 = ignite3.jcache(null); - GridCacheTx tx = cache1.txStart(); + GridCacheTx tx = ignite1.transactions().txStart(); cache1.put("key", "val"); - GridCacheEntry<String, String> entry = cache1.entry("key"); - - assert entry != null; - long ttl = 500; - entry.timeToLive(ttl); - - // Must update value for TTL to have effect. - entry.set("val"); - - assert entry.timeToLive() == ttl; + cache1.withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl))).put("key", "val"); assert cache1.get("key") != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java index b3e05c4..042a09d 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java @@ -15,7 +15,9 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; +import javax.cache.expiry.*; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; @@ -222,16 +224,10 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache assertEquals((Integer)1, cache.get(key)); - GridCacheEntry<String, Integer> entry = cache.entry(key); - - assert entry != null; - long ttl = 500; - entry.timeToLive(ttl); - - // Update is required for TTL to have effect. - entry.set(1); + grid(0).jcache(null). + withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl))).put(key, 1); Thread.sleep(ttl + 100); @@ -337,14 +333,10 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache assertEquals(null, c.peek(key)); - int ttl = 500; - - GridCacheEntry<String, Integer> entry = c.entry(key); - - entry.timeToLive(ttl); + long ttl = 500; - // Update is required for TTL to have effect. - entry.set(1); + grid(0).jcache(null). + withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl))).put(key, 1); Thread.sleep(ttl + 100); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java index cb9ba90..94d1384 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java @@ -12,7 +12,9 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near; import org.gridgain.grid.cache.*; import org.gridgain.grid.util.typedef.*; +import javax.cache.expiry.*; import java.util.*; +import java.util.concurrent.*; import static org.gridgain.grid.cache.GridCacheAtomicityMode.ATOMIC; import static org.gridgain.grid.cache.GridCacheDistributionMode.*; @@ -150,16 +152,10 @@ public class GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest extends GridCacheNe assertEquals((Integer)1, cache.get(key)); - GridCacheEntry<String, Integer> entry = cache.entry(key); - - assert entry != null; - long ttl = 500; - entry.timeToLive(ttl); - - // Update is required for TTL to have effect. - entry.set(1); + grid(0).jcache(null). + withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl))).put(key, 1); Thread.sleep(ttl + 100); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java index 18f3e83..3f1479b 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near; +import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.gridgain.grid.cache.*; @@ -21,6 +22,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.gridgain.grid.util.typedef.*; +import javax.cache.expiry.*; +import java.util.concurrent.*; + import static org.gridgain.grid.cache.GridCacheMode.*; import static org.gridgain.grid.cache.GridCacheTxIsolation.*; import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; @@ -82,8 +86,8 @@ public class GridCachePartitionedEvictionSelfTest extends GridCacheAbstractSelfT * @param node Node. * @return Cache. */ - private GridCacheProjection<String, Integer> cache(ClusterNode node) { - return G.ignite(node.id()).cache(null); + private IgniteCache<String, Integer> cache(ClusterNode node) { + return G.ignite(node.id()).jcache(null); } /** @@ -158,17 +162,21 @@ public class GridCachePartitionedEvictionSelfTest extends GridCacheAbstractSelfT GridCacheAffinity<String> aff = dht0.affinity(); + TouchedExpiryPolicy plc = new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 10)); + for (int kv = 0; kv < KEY_CNT; kv++) { String key = String.valueOf(kv); - GridCacheProjection<String, Integer> c = cache(aff.mapKeyToNode(key)); + ClusterNode node = aff.mapKeyToNode(key); - try (GridCacheTx tx = c.txStart(concurrency, isolation)) { - assert c.get(key) == null; + IgniteCache<String, Integer> c = cache(node); - c.put(key, kv); + IgniteTransactions txs = G.ignite(node.id()).transactions(); + + try (GridCacheTx tx = txs.txStart(concurrency, isolation)) { + assert c.get(key) == null; - c.entry(key).timeToLive(10); + c.withExpiryPolicy(plc).put(key, 1); assertEquals(Integer.valueOf(kv), c.get(key)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java index c2ee568..ea3797c 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java @@ -12,11 +12,11 @@ package org.gridgain.grid.kernal.processors.hadoop.jobtracker; import org.apache.ignite.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.hadoop.*; import org.gridgain.grid.kernal.processors.hadoop.counter.*; import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.*; @@ -28,10 +28,12 @@ import org.gridgain.grid.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static java.util.concurrent.TimeUnit.*; import static org.gridgain.grid.hadoop.GridHadoopJobPhase.*; import static org.gridgain.grid.hadoop.GridHadoopTaskType.*; import static org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopTaskState.*; @@ -46,6 +48,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent { /** */ private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj; + /** Projection with expiry policy for finished job updates. */ + private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj; + /** Map-reduce execution planner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private GridHadoopMapReducePlanner mrPlanner; @@ -114,6 +119,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent { } jobMetaPrj = prj = sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class); + + TouchedExpiryPolicy finishedJobPlc = new TouchedExpiryPolicy( + new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); + + finishedJobMetaPrj = ((GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)prj). + withExpiryPolicy(finishedJobPlc); } } } @@ -121,6 +132,23 @@ public class GridHadoopJobTracker extends GridHadoopComponent { return prj; } + /** + * @return Projection with expiry policy for finished job updates. + */ + private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() { + GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj; + + if (prj == null) { + jobMetaCache(); + + prj = finishedJobMetaPrj; + + assert prj != null; + } + + return prj; + } + /** {@inheritDoc} */ @SuppressWarnings("deprecation") @Override public void onKernalStart() throws IgniteCheckedException { @@ -430,11 +458,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent { case COMMIT: case ABORT: { - GridCacheEntry<GridHadoopJobId, GridHadoopJobMetadata> entry = jobMetaCache().entry(info.jobId()); - - entry.timeToLive(ctx.configuration().getFinishedJobInfoTtl()); + GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache(); - entry.transformAsync(new UpdatePhaseClosure(incrCntrs, PHASE_COMPLETE)).listenAsync(failsLog); + cache.transformAsync(info.jobId(), new UpdatePhaseClosure(incrCntrs, PHASE_COMPLETE)). + listenAsync(failsLog); break; }
