Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-41 784fadaf7 -> eac503e7b


# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eac503e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eac503e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eac503e7

Branch: refs/heads/ignite-41
Commit: eac503e7b4e29d74addbdf31150ef6384360c28f
Parents: 784fada
Author: sboikov <[email protected]>
Authored: Fri Dec 19 15:27:22 2014 +0300
Committer: sboikov <[email protected]>
Committed: Fri Dec 19 16:08:06 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  1 +
 .../processors/cache/GridCacheMapEntry.java     |  9 +++--
 .../processors/cache/GridCacheProxyImpl.java    | 11 +++++-
 .../cache/GridCacheTxLocalAdapter.java          | 31 ++++++++-------
 .../dht/atomic/GridDhtAtomicCache.java          |  6 +--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  4 ++
 .../handlers/cache/GridCacheCommandHandler.java | 41 +++++++-------------
 .../cache/GridCacheTtlManagerLoadTest.java      | 11 +++---
 .../GridCacheBasicOpAbstractTest.java           | 21 +++-------
 ...tomicClientOnlyMultiNodeFullApiSelfTest.java | 22 ++++-------
 ...eAtomicNearOnlyMultiNodeFullApiSelfTest.java | 12 ++----
 .../GridCachePartitionedEvictionSelfTest.java   | 22 +++++++----
 .../hadoop/jobtracker/GridHadoopJobTracker.java | 37 +++++++++++++++---
 13 files changed, 124 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f1d27f0..01133b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -49,6 +49,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, 
V>, Externalizable
     private GridCacheProjectionImpl<K, V> prj;
 
     /**
+     * @param ctx Context.
      * @param delegate Delegate.
      * @param prj Projection.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 8b79382..7944509 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1764,7 +1764,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                     if (drRes.isUseOld()) {
                         old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
 
-                        return new GridCacheUpdateAtomicResult<>(false, old, 
null, 0L, -1L, null, null, false);
+                        return new GridCacheUpdateAtomicResult<>(false, old, 
null, -1L, -1L, null, null, false);
                     }
 
                     newTtl = drRes.newTtl();
@@ -1930,13 +1930,13 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
                 if (drRes == null) {
                     // Calculate TTL and expire time for local update.
                     if (drTtl >= 0L) {
-                        assert drExpireTime >= 0L;
+                        assert drExpireTime >= 0L : drExpireTime;
 
                         ttl0 = drTtl;
                         newExpireTime = drExpireTime;
                     }
                     else {
-                        assert drExpireTime == -1L;
+                        assert drExpireTime == -1L : drExpireTime;
 
                         if (expiryPlc != null)
                             newTtl = hadVal ? expiryPlc.forUpdate() : 
expiryPlc.forCreate();
@@ -1953,6 +1953,8 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                         }
                     }
                 }
+                else if (newTtl == -1L)
+                    ttl0 = ttlExtras();
 
                 // Try write-through.
                 if (writeThrough)
@@ -2496,6 +2498,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
         GridCacheVersion ver) {
         assert ver != null;
         assert Thread.holdsLock(this);
+        assert ttl >= 0 : ttl;
 
         long oldExpireTime = expireTimeExtras();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 38cc2ca..3830679 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -1886,7 +1886,16 @@ public class GridCacheProxyImpl<K, V> implements 
GridCacheProxy<K, V>, Externali
 
     /** {@inheritDoc} */
     @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy 
plc) {
-        throw new UnsupportedOperationException();
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            GridCacheProjectionEx<K, V> prj0 = prj != null ? 
prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
+
+            return new GridCacheProxyImpl<>(ctx, prj0, 
(GridCacheProjectionImpl<K, V>)prj0);
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 1cafbed..3959419 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -652,20 +652,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> 
extends GridCacheTxAdapter<K
                                     V val = res.get2();
                                     byte[] valBytes = res.get3();
 
-                                    if (op == CREATE || op == UPDATE && 
txEntry.drExpireTime() == -1L) {
-                                        ExpiryPolicy expiry = txEntry.expiry();
-
-                                        if (expiry == null)
-                                            expiry = cacheCtx.expiry();
-
-                                        if (expiry != null) {
-                                            Duration duration = 
cached.hasValue() ?
-                                                expiry.getExpiryForUpdate() : 
expiry.getExpiryForCreation();
-
-                                            
txEntry.ttl(GridCacheMapEntry.toTtl(duration));
-                                        }
-                                    }
-
                                     // Deal with DR conflicts.
                                     GridCacheVersion explicitVer = 
txEntry.drVersion() != null ?
                                         txEntry.drVersion() : writeVersion();
@@ -687,10 +673,25 @@ public abstract class GridCacheTxLocalAdapter<K, V> 
extends GridCacheTxAdapter<K
                                         if (drRes.isMerge())
                                             explicitVer = writeVersion();
                                     }
-                                    else
+                                    else {
                                         // Nullify explicit version so that 
innerSet/innerRemove will work as usual.
                                         explicitVer = null;
 
+                                        if (op == CREATE || op == UPDATE && 
txEntry.drExpireTime() == -1L) {
+                                            ExpiryPolicy expiry = 
txEntry.expiry();
+
+                                            if (expiry == null)
+                                                expiry = cacheCtx.expiry();
+
+                                            if (expiry != null) {
+                                                Duration duration = 
cached.hasValue() ?
+                                                    
expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+
+                                                
txEntry.ttl(GridCacheMapEntry.toTtl(duration));
+                                            }
+                                        }
+                                    }
+
                                     if (sndTransformedVals || (drRes != null)) 
{
                                         assert sndTransformedVals && 
cacheCtx.isReplicated() || (drRes != null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d8694c6..334ecb0 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1498,8 +1498,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     req.subjectId(),
                     taskName);
 
-                assert updRes.newTtl() == -1L || (expiry != null || 
updRes.drExpireTime() >= 0);
-
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, 
true);
 
@@ -1630,6 +1628,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param res Response.
      * @param replicate Whether replication is enabled.
      * @param batchRes Batch update result.
+     * @param expiry Expiry policy.
      * @return Deleted entries.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
@@ -2361,6 +2360,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param res Dht atomic update response.
      */
+    @SuppressWarnings("unchecked")
     private void processDhtAtomicUpdateResponse(UUID nodeId, 
GridDhtAtomicUpdateResponse<K, V> res) {
         if (log.isDebugEnabled())
             log.debug("Processing dht atomic update response [nodeId=" + 
nodeId + ", res=" + res + ']');
@@ -2417,7 +2417,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param plc Expiry policy.
      * @return Expiry policy wrapper.
      */
-    static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) {
+    private static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy 
plc) {
         return plc == null ? null : new UpdateExpiryPolicy(plc);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index fda44c9..0d44e03 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -203,6 +203,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> imp
      * @param keyBytes Key bytes, if key was already serialized.
      * @param val Value, {@code null} if should be removed.
      * @param valBytes Value bytes, {@code null} if should be removed.
+     * @param transformC Transform closure.
      * @param drTtl DR TTL (optional).
      * @param drExpireTime DR expire time (optional).
      * @param drVer DR version (optional).
@@ -267,6 +268,9 @@ public class GridDhtAtomicUpdateRequest<K, V> extends 
GridCacheMessage<K, V> imp
      * @param keyBytes Key bytes, if key was already serialized.
      * @param val Value, {@code null} if should be removed.
      * @param valBytes Value bytes, {@code null} if should be removed.
+     * @param transformC Transform closure.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
      */
     public void addNearWriteValue(K key,
         @Nullable byte[] keyBytes,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 1a68cc7..6f34ebd 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -28,6 +28,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -929,18 +930,12 @@ public class GridCacheCommandHandler extends 
GridRestCommandHandlerAdapter {
         /** {@inheritDoc} */
         @Override public IgniteFuture<?> applyx(GridCacheProjection<Object, 
Object> c, GridKernalContext ctx) {
             if (ttl != null) {
-                GridCacheEntry<Object, Object> entry = c.entry(key);
+                Duration duration = new Duration(TimeUnit.MILLISECONDS, ttl);
 
-                if (entry != null) {
-                    entry.timeToLive(ttl);
-
-                    return entry.setxAsync(val);
-                }
-                else
-                    return new GridFinishedFuture<Object>(ctx, false);
+                c = ((GridCacheProjectionEx<Object, 
Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration));
             }
-            else
-                return c.putxAsync(key, val);
+
+            return c.putxAsync(key, val);
         }
     }
 
@@ -971,16 +966,13 @@ public class GridCacheCommandHandler extends 
GridRestCommandHandlerAdapter {
 
         /** {@inheritDoc} */
         @Override public IgniteFuture<?> applyx(GridCacheProjection<Object, 
Object> c, GridKernalContext ctx) {
-            GridCacheEntry<Object, Object> entry = c.entry(key);
-
-            if (entry != null) {
-                if (ttl != null)
-                    entry.timeToLive(ttl);
+            if (ttl != null) {
+                Duration duration = new Duration(TimeUnit.MILLISECONDS, ttl);
 
-                return entry.setxIfAbsentAsync(val);
+                c = ((GridCacheProjectionEx<Object, 
Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration));
             }
-            else
-                return new GridFinishedFuture<Object>(ctx, false);
+
+            return c.putxIfAbsentAsync(key, val);
         }
     }
 
@@ -1011,16 +1003,13 @@ public class GridCacheCommandHandler extends 
GridRestCommandHandlerAdapter {
 
         /** {@inheritDoc} */
         @Override public IgniteFuture<?> applyx(GridCacheProjection<Object, 
Object> c, GridKernalContext ctx) {
-            GridCacheEntry<Object, Object> entry = c.entry(key);
-
-            if (entry != null) {
-                if (ttl != null)
-                    entry.timeToLive(ttl);
+            if (ttl != null) {
+                Duration duration = new Duration(TimeUnit.MILLISECONDS, ttl);
 
-                return entry.replacexAsync(val);
+                c = ((GridCacheProjectionEx<Object, 
Object>)c).withExpiryPolicy(new ModifiedExpiryPolicy(duration));
             }
-            else
-                return new GridFinishedFuture<Object>(ctx, false);
+
+            return c.replacexAsync(key, val);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java
index 371ad91..ccc5a84 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTtlManagerLoadTest.java
@@ -9,11 +9,12 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.util.typedef.internal.*;
 
+import javax.cache.expiry.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -36,15 +37,13 @@ public class GridCacheTtlManagerLoadTest extends 
GridCacheTtlManagerSelfTest {
 
             IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    GridCache<Object,Object> cache = g.cache(null);
+                    IgniteCache<Object,Object> cache = g.jcache(null).
+                        withExpiryPolicy(new TouchedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, 1000)));
 
                     long key = 0;
 
                     while (!stop.get()) {
-                        GridCacheEntry<Object, Object> entry = 
cache.entry(key);
-
-                        entry.timeToLive(1000);
-                        entry.setValue(key);
+                        cache.put(key, key);
 
                         key++;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
index 53c6ff0..bd473eb 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -21,6 +20,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.testframework.junits.common.*;
 
+import javax.cache.expiry.*;
 import java.util.concurrent.*;
 
 import static java.util.concurrent.TimeUnit.*;
@@ -313,26 +313,17 @@ public abstract class GridCacheBasicOpAbstractTest 
extends GridCommonAbstractTes
      * @throws Exception In case of error.
      */
     public void testPutWithExpiration() throws Exception {
-        GridCache<String, String> cache1 = ignite1.cache(null);
-        GridCache<String, String> cache2 = ignite2.cache(null);
-        GridCache<String, String> cache3 = ignite3.cache(null);
+        IgniteCache<String, String> cache1 = ignite1.jcache(null);
+        IgniteCache<String, String> cache2 = ignite2.jcache(null);
+        IgniteCache<String, String> cache3 = ignite3.jcache(null);
 
-        GridCacheTx tx = cache1.txStart();
+        GridCacheTx tx = ignite1.transactions().txStart();
 
         cache1.put("key", "val");
 
-        GridCacheEntry<String, String> entry = cache1.entry("key");
-
-        assert entry != null;
-
         long ttl = 500;
 
-        entry.timeToLive(ttl);
-
-        // Must update value for TTL to have effect.
-        entry.set("val");
-
-        assert entry.timeToLive() == ttl;
+        cache1.withExpiryPolicy(new TouchedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, ttl))).put("key", "val");
 
         assert cache1.get("key") != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
index b3e05c4..042a09d 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
@@ -15,7 +15,9 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 
+import javax.cache.expiry.*;
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
@@ -222,16 +224,10 @@ public class 
GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
 
         assertEquals((Integer)1, cache.get(key));
 
-        GridCacheEntry<String, Integer> entry = cache.entry(key);
-
-        assert entry != null;
-
         long ttl = 500;
 
-        entry.timeToLive(ttl);
-
-        // Update is required for TTL to have effect.
-        entry.set(1);
+        grid(0).jcache(null).
+            withExpiryPolicy(new TouchedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, ttl))).put(key, 1);
 
         Thread.sleep(ttl + 100);
 
@@ -337,14 +333,10 @@ public class 
GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
 
         assertEquals(null, c.peek(key));
 
-        int ttl = 500;
-
-        GridCacheEntry<String, Integer> entry = c.entry(key);
-
-        entry.timeToLive(ttl);
+        long ttl = 500;
 
-        // Update is required for TTL to have effect.
-        entry.set(1);
+        grid(0).jcache(null).
+            withExpiryPolicy(new TouchedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, ttl))).put(key, 1);
 
         Thread.sleep(ttl + 100);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java
index cb9ba90..94d1384 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.java
@@ -12,7 +12,9 @@ package 
org.gridgain.grid.kernal.processors.cache.distributed.near;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.util.typedef.*;
 
+import javax.cache.expiry.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicityMode.ATOMIC;
 import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
@@ -150,16 +152,10 @@ public class 
GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest extends GridCacheNe
 
         assertEquals((Integer)1, cache.get(key));
 
-        GridCacheEntry<String, Integer> entry = cache.entry(key);
-
-        assert entry != null;
-
         long ttl = 500;
 
-        entry.timeToLive(ttl);
-
-        // Update is required for TTL to have effect.
-        entry.set(1);
+        grid(0).jcache(null).
+            withExpiryPolicy(new TouchedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, ttl))).put(key, 1);
 
         Thread.sleep(ttl + 100);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java
index 18f3e83..3f1479b 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedEvictionSelfTest.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.cache.distributed.near;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.gridgain.grid.cache.*;
@@ -21,6 +22,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.grid.util.typedef.*;
 
+import javax.cache.expiry.*;
+import java.util.concurrent.*;
+
 import static org.gridgain.grid.cache.GridCacheMode.*;
 import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
 import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
@@ -82,8 +86,8 @@ public class GridCachePartitionedEvictionSelfTest extends 
GridCacheAbstractSelfT
      * @param node Node.
      * @return Cache.
      */
-    private GridCacheProjection<String, Integer> cache(ClusterNode node) {
-        return G.ignite(node.id()).cache(null);
+    private IgniteCache<String, Integer> cache(ClusterNode node) {
+        return G.ignite(node.id()).jcache(null);
     }
 
     /**
@@ -158,17 +162,21 @@ public class GridCachePartitionedEvictionSelfTest extends 
GridCacheAbstractSelfT
 
         GridCacheAffinity<String> aff = dht0.affinity();
 
+        TouchedExpiryPolicy plc = new TouchedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, 10));
+
         for (int kv = 0; kv < KEY_CNT; kv++) {
             String key = String.valueOf(kv);
 
-            GridCacheProjection<String, Integer> c = 
cache(aff.mapKeyToNode(key));
+            ClusterNode node = aff.mapKeyToNode(key);
 
-            try (GridCacheTx tx = c.txStart(concurrency, isolation)) {
-                assert c.get(key) == null;
+            IgniteCache<String, Integer> c = cache(node);
 
-                c.put(key, kv);
+            IgniteTransactions txs = G.ignite(node.id()).transactions();
+
+            try (GridCacheTx tx = txs.txStart(concurrency, isolation)) {
+                assert c.get(key) == null;
 
-                c.entry(key).timeToLive(10);
+                c.withExpiryPolicy(plc).put(key, 1);
 
                 assertEquals(Integer.valueOf(kv), c.get(key));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eac503e7/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
 
b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index c2ee568..ea3797c 100644
--- 
a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ 
b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -12,11 +12,11 @@ package 
org.gridgain.grid.kernal.processors.hadoop.jobtracker;
 import org.apache.ignite.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.hadoop.*;
 import org.gridgain.grid.kernal.managers.eventstorage.*;
+import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.hadoop.*;
 import org.gridgain.grid.kernal.processors.hadoop.counter.*;
 import org.gridgain.grid.kernal.processors.hadoop.taskexecutor.*;
@@ -28,10 +28,12 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static java.util.concurrent.TimeUnit.*;
 import static org.gridgain.grid.hadoop.GridHadoopJobPhase.*;
 import static org.gridgain.grid.hadoop.GridHadoopTaskType.*;
 import static 
org.gridgain.grid.kernal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
@@ -46,6 +48,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent 
{
     /** */
     private volatile GridCacheProjection<GridHadoopJobId, 
GridHadoopJobMetadata> jobMetaPrj;
 
+    /** Projection with expiry policy for finished job updates. */
+    private volatile GridCacheProjection<GridHadoopJobId, 
GridHadoopJobMetadata> finishedJobMetaPrj;
+
     /** Map-reduce execution planner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private GridHadoopMapReducePlanner mrPlanner;
@@ -114,6 +119,12 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
                     }
 
                     jobMetaPrj = prj = 
sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
+
+                    TouchedExpiryPolicy finishedJobPlc = new 
TouchedExpiryPolicy(
+                        new Duration(MILLISECONDS, 
ctx.configuration().getFinishedJobInfoTtl()));
+
+                    finishedJobMetaPrj = 
((GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)prj).
+                        withExpiryPolicy(finishedJobPlc);
                 }
             }
         }
@@ -121,6 +132,23 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
         return prj;
     }
 
+    /**
+     * @return Projection with expiry policy for finished job updates.
+     */
+    private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> 
finishedJobMetaCache() {
+        GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = 
finishedJobMetaPrj;
+
+        if (prj == null) {
+            jobMetaCache();
+
+            prj = finishedJobMetaPrj;
+
+            assert prj != null;
+        }
+
+        return prj;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("deprecation")
     @Override public void onKernalStart() throws IgniteCheckedException {
@@ -430,11 +458,10 @@ public class GridHadoopJobTracker extends 
GridHadoopComponent {
 
                 case COMMIT:
                 case ABORT: {
-                    GridCacheEntry<GridHadoopJobId, GridHadoopJobMetadata> 
entry = jobMetaCache().entry(info.jobId());
-
-                    
entry.timeToLive(ctx.configuration().getFinishedJobInfoTtl());
+                    GridCacheProjection<GridHadoopJobId, 
GridHadoopJobMetadata> cache = finishedJobMetaCache();
 
-                    entry.transformAsync(new UpdatePhaseClosure(incrCntrs, 
PHASE_COMPLETE)).listenAsync(failsLog);
+                    cache.transformAsync(info.jobId(), new 
UpdatePhaseClosure(incrCntrs, PHASE_COMPLETE)).
+                        listenAsync(failsLog);
 
                     break;
                 }

Reply via email to