# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5f95bd2e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5f95bd2e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5f95bd2e Branch: refs/heads/ignite-1 Commit: 5f95bd2e5eb0227f1e329feb8b0c27ff3184f8c2 Parents: 18b5b5a Author: sboikov <[email protected]> Authored: Thu Dec 18 13:42:29 2014 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 18 14:58:50 2014 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 3 +- .../cache/GridCacheAccessExpiryPolicy.java | 35 +++- .../processors/cache/GridCacheAdapter.java | 33 +++- .../processors/cache/GridCacheMapEntry.java | 2 - .../processors/cache/GridCacheTxHandler.java | 17 +- .../cache/GridCacheTxLocalAdapter.java | 8 - .../GridDistributedTxRemoteAdapter.java | 39 ----- .../distributed/dht/GridDhtCacheAdapter.java | 162 ++++++++++++++----- .../cache/distributed/dht/GridDhtGetFuture.java | 70 ++++++-- .../distributed/dht/GridDhtTxFinishFuture.java | 21 +++ .../distributed/dht/GridDhtTxFinishRequest.java | 95 +++++++++++ .../cache/distributed/dht/GridDhtTxRemote.java | 18 ++- .../dht/GridPartitionedGetFuture.java | 23 ++- .../dht/atomic/GridDhtAtomicCache.java | 35 ++-- .../dht/colocated/GridDhtColocatedCache.java | 45 ++++-- .../distributed/near/GridNearCacheAdapter.java | 16 +- .../distributed/near/GridNearGetFuture.java | 39 +++-- .../distributed/near/GridNearGetRequest.java | 10 ++ .../near/GridNearTransactionalCache.java | 18 ++- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../near/GridNearTxPrepareRequest.java | 7 + .../distributed/near/GridNearTxRemote.java | 8 +- .../local/atomic/GridLocalAtomicCache.java | 18 ++- .../cache/IgniteCacheAbstractTest.java | 15 ++ .../IgniteCacheAtomicExpiryPolicyTest.java | 6 + ...AtomicPrimaryWriteOrderExpiryPolicyTest.java | 24 +++ .../IgniteCacheExpiryPolicyAbstractTest.java | 102 +++++++++--- .../IgniteCacheExpiryPolicyTestSuite.java | 1 + .../bamboo/GridDataGridTestSuite.java | 4 +- 29 files changed, 664 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 9ba7c45..87086a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -163,7 +163,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final int DFLT_PORT = 47100; /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */ - public static final int DFLT_SHMEM_PORT = 48100; + // FIXME IGNITE-41. + public static final int DFLT_SHMEM_PORT = -1; /** Default idle connection timeout (value is <tt>30000</tt>ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java index 0b6152d..07f4ae8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java @@ -9,11 +9,13 @@ package org.gridgain.grid.kernal.processors.cache; -import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.lang.*; import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import java.util.*; /** * @@ -23,7 +25,7 @@ public class GridCacheAccessExpiryPolicy { private final long ttl; /** */ - private GridCacheTtlUpdateRequest req; + private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries; /** * @param expiryPlc Expiry policy. @@ -58,24 +60,41 @@ public class GridCacheAccessExpiryPolicy { } /** + * + */ + public void reset() { + Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries; + + if (entries0 != null) + entries0.clear(); + } + + /** * @param key Entry key. * @param keyBytes Entry key bytes. * @param ver Entry version. */ @SuppressWarnings("unchecked") public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) { - if (req == null) - req = new GridCacheTtlUpdateRequest(ttl); + Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries; + + if (entries0 == null) { + synchronized (this) { + entries0 = entries; - req.addEntry(key, keyBytes, ver); + if (entries0 == null) + entries0 = entries = new ConcurrentHashMap8<>(); + } + } + + entries0.put(key, new IgniteBiTuple<>(keyBytes, ver)); } /** * @return TTL update request. */ - @SuppressWarnings("unchecked") - @Nullable public <K, V> GridCacheTtlUpdateRequest<K, V> request() { - return (GridCacheTtlUpdateRequest<K, V>)req; + @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() { + return entries; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 1f40a7c..a4ec90f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -1737,16 +1737,37 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter ) { - subjId = ctx.subjectIdPerCall(subjId); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); - return getAllAsync(keys, entry, !skipTx, subjId, taskName, deserializePortable, forcePrimary, filter); + subjId = ctx.subjectIdPerCall(subjId, prj); + + ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; + + if (expiryPlc == null) + expiryPlc = ctx.expiry(); + + return getAllAsync(keys, + entry, + !skipTx, + subjId, + taskName, + deserializePortable, + forcePrimary, + GridCacheAccessExpiryPolicy.forPolicy(expiryPlc), + filter); } /** {@inheritDoc} */ public IgniteFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, - @Nullable GridCacheEntryEx<K, V> cached, boolean checkTx, @Nullable final UUID subjId, final String taskName, - final boolean deserializePortable, final boolean forcePrimary, - @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter) { + @Nullable GridCacheEntryEx<K, V> cached, + boolean checkTx, + @Nullable final UUID subjId, + final String taskName, + final boolean deserializePortable, + final boolean forcePrimary, + @Nullable GridCacheAccessExpiryPolicy expiry, + @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter + ) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); ctx.denyOnFlag(LOCAL); @@ -1813,7 +1834,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im null, taskName, filter, - null); + expiry); GridCacheVersion ver = entry.version(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java index e4ccc11..c191dda 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java @@ -1127,8 +1127,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable UUID subjId, String taskName ) throws IgniteCheckedException, GridCacheEntryRemovedException { - // log.info("Inner set " + key + " " + val + " " + ttl); - V old; boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java index fa85566..170d7a3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java @@ -16,6 +16,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; @@ -759,10 +760,10 @@ public class GridCacheTxHandler<K, V> { if (nearTx != null && nearTx.local()) nearTx = null; - finish(nodeId, dhtTx, req, req.writes()); + finish(nodeId, dhtTx, req, req.writes(), req.ttls()); if (nearTx != null) - finish(nodeId, nearTx, req, req.nearWrites()); + finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls()); sendReply(nodeId, req); } @@ -772,12 +773,14 @@ public class GridCacheTxHandler<K, V> { * @param tx Transaction. * @param req Request. * @param writes Writes. + * @param ttls TTLs for optimistic transaction. */ protected void finish( UUID nodeId, GridCacheTxRemoteEx<K, V> tx, GridDhtTxFinishRequest<K, V> req, - Collection<GridCacheTxEntry<K, V>> writes) { + Collection<GridCacheTxEntry<K, V>> writes, + @Nullable GridLongList ttls) { // We don't allow explicit locks for transactions and // therefore immediately return if transaction is null. // However, we may decide to relax this restriction in @@ -799,6 +802,8 @@ public class GridCacheTxHandler<K, V> { log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req + ", tx=" + tx + ']'); + assert ttls == null || tx.concurrency() == OPTIMISTIC; + try { if (req.commit() || req.isSystemInvalidate()) { if (tx.commitVersion(req.commitVersion())) { @@ -819,6 +824,12 @@ public class GridCacheTxHandler<K, V> { entry + ", tx=" + tx + ']'); } } + else if (tx.concurrency() == OPTIMISTIC && ttls != null) { + int idx = 0; + + for (GridCacheTxEntry<K, V> e : tx.writeEntries()) + e.ttl(ttls.get(idx)); + } // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index 1c96b32..479948c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -586,8 +586,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K addGroupTxMapping(writeSet()); if (!empty) { - log.info("User commit"); - // We are holding transaction-level locks for entries here, so we can get next write version. writeVersion(cctx.versions().next(topologyVersion())); @@ -666,10 +664,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K } } - // Preserve TTL if needed. - if (txEntry.ttl() < 0) - txEntry.ttl(cached.ttl()); - // Deal with DR conflicts. GridCacheVersion explicitVer = txEntry.drVersion() != null ? txEntry.drVersion() : writeVersion(); @@ -2736,8 +2730,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K old.cached(entry, old.keyBytes()); old.filters(filter); - long ttl = -1L; - // Update ttl if specified. if (drTtl >= 0L) { assert drExpireTime >= 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3b34552..5fdafe9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -293,43 +293,6 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, } /** - * @param key Key to add to read set. - * @param keyBytes Key bytes. - * @param drVer Data center replication version. - */ - public void addRead(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key, byte[] keyBytes, @Nullable GridCacheVersion drVer) { - checkInternal(key); - - GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, READ, null, 0L, -1L, - cacheCtx.cache().entryEx(key.key()), drVer); - - txEntry.keyBytes(keyBytes); - - readMap.put(key, txEntry); - } - - /** - * @param key Key to add to write set. - * @param keyBytes Key bytes. - * @param op Cache operation. - * @param val Write value. - * @param valBytes Write value bytes. - * @param drVer Data center replication version. - */ - public void addWrite(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key, byte[] keyBytes, GridCacheOperation op, V val, byte[] valBytes, - @Nullable GridCacheVersion drVer) { - checkInternal(key); - - GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, - cacheCtx.cache().entryEx(key.key()), drVer); - - txEntry.keyBytes(keyBytes); - txEntry.valueBytes(valBytes); - - writeMap.put(key, txEntry); - } - - /** * @param e Transaction entry to set. * @return {@code True} if value was set. */ @@ -446,8 +409,6 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, @SuppressWarnings({"CatchGenericClass"}) private void commitIfLocked() throws IgniteCheckedException { if (state() == COMMITTING) { - log.info("commitIfLocked"); - for (GridCacheTxEntry<K, V> txEntry : writeMap.values()) { assert txEntry != null : "Missing transaction entry for tx: " + this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 328a8d5..fef3b82 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; @@ -102,33 +103,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap }); } - /** - * @param req Request. - */ - private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { - int size = req.keys().size(); - - for (int i = 0; i < size; i++) { - try { - GridCacheEntryEx<K, V> entry; - - if (ctx.isSwapOrOffheapEnabled()) { - entry = ctx.cache().entryEx(req.key(i), true); - - entry.unswap(true, false); - } - else - entry = ctx.cache().peekEx(req.key(i)); - - if (entry != null) - entry.updateTtl(req.version(i), req.ttl()); - } - catch (IgniteCheckedException e) { - log.error("Failed to unswap entry.", e); - } - } - } - /** {@inheritDoc} */ @Override public void stop() { super.stop(); @@ -454,7 +428,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, org.apache.ignite.lang.IgnitePredicate[])} + * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], GridCacheAccessExpiryPolicy)} * method instead to retrieve DHT value. * * @param keys {@inheritDoc} @@ -473,8 +447,15 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter ) { - return getAllAsync(keys, null, /*don't check local tx. */false, subjId, taskName, deserializePortable, - forcePrimary, filter); + return getAllAsync(keys, + null, + /*don't check local tx. */false, + subjId, + taskName, + deserializePortable, + forcePrimary, + null, + filter); } /** {@inheritDoc} */ @@ -490,12 +471,28 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * @param keys Keys to get - * @param filter {@inheritDoc} - * @return {@inheritDoc} + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param filter Optional filter. + * @param expiry Expiry policy. + * @return Get future. */ - IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, @Nullable UUID subjId, - String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - return getAllAsync(keys, null, /*don't check local tx. */false, subjId, taskName, deserializePortable, false, + IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable GridCacheAccessExpiryPolicy expiry + ) { + return getAllAsync(keys, + null, + /*don't check local tx. */false, + subjId, + taskName, + deserializePortable, + false, + expiry, filter); } @@ -505,7 +502,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param keys Keys to get. * @param reload Reload flag. * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param deserializePortable Deserialize portable flag. * @param filter Optional filter. + * @param expiry Expiry policy. * @return DHT future. */ public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, @@ -516,7 +517,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, int taskNameHash, boolean deserializePortable, - IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable GridCacheAccessExpiryPolicy expiry) { GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, @@ -527,7 +529,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap filter, subjId, taskNameHash, - deserializePortable); + deserializePortable, + expiry); fut.init(); @@ -541,6 +544,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) { assert isAffinityNode(cacheCfg); + long ttl = req.accessTtl(); + + final GridCacheAccessExpiryPolicy expiryPlc = ttl == -1L ? null : new GridCacheAccessExpiryPolicy(ttl); + IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut = getDhtAsync(nodeId, req.messageId(), @@ -550,7 +557,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap req.subjectId(), req.taskNameHash(), false, - req.filter()); + req.filter(), + expiryPlc); fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() { @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) { @@ -582,10 +590,88 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e); } + + sendTtlUpdateRequest(expiryPlc); } }); } + /** + * @param expiryPlc Expiry policy. + */ + protected void sendTtlUpdateRequest(@Nullable final GridCacheAccessExpiryPolicy expiryPlc) { + if (expiryPlc != null && expiryPlc.entries() != null) { + ctx.closures().runLocalSafe(new Runnable() { + @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"}) + @Override public void run() { + Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries = expiryPlc.entries(); + + assert entries != null && !entries.isEmpty(); + + Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>(); + + long topVer = ctx.discovery().topologyVersion(); + + for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) { + List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer); + + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (!node.isLocal()) { + GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node); + + if (req == null) { + reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.ttl())); + + req.cacheId(ctx.cacheId()); + } + + req.addEntry((K)e.getKey(), e.getValue().get1(), e.getValue().get2()); + } + } + } + + for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) { + try { + ctx.io().send(req.getKey(), req.getValue()); + } + catch (IgniteCheckedException e) { + log.error("Failed to send TTL update request.", e); + } + } + } + }); + } + } + + /** + * @param req Request. + */ + private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) { + int size = req.keys().size(); + + for (int i = 0; i < size; i++) { + try { + GridCacheEntryEx<K, V> entry; + + if (ctx.isSwapOrOffheapEnabled()) { + entry = ctx.cache().entryEx(req.key(i), true); + + entry.unswap(true, false); + } + else + entry = ctx.cache().peekEx(req.key(i)); + + if (entry != null) + entry.updateTtl(req.version(i), req.ttl()); + } + catch (IgniteCheckedException e) { + log.error("Failed to unswap entry.", e); + } + } + } + /** {@inheritDoc} */ @Override public void unlockAll(Collection<? extends K> keys, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java index 6db3540..2ce4b6c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -83,6 +83,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** Whether to deserialize portable objects. */ private boolean deserializePortable; + /** Expiry policy. */ + private GridCacheAccessExpiryPolicy expiryPlc; + /** * Empty constructor required for {@link Externalizable}. */ @@ -99,6 +102,10 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param tx Transaction. * @param topVer Topology version. * @param filters Filters. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. */ public GridDhtGetFuture( GridCacheContext<K, V> cctx, @@ -111,7 +118,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters, @Nullable UUID subjId, int taskNameHash, - boolean deserializePortable) { + boolean deserializePortable, + @Nullable GridCacheAccessExpiryPolicy expiryPlc) { super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer()); assert reader != null; @@ -128,6 +136,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col this.subjId = subjId; this.deserializePortable = deserializePortable; this.taskNameHash = taskNameHash; + this.expiryPlc = expiryPlc; futId = IgniteUuid.randomUuid(); @@ -325,11 +334,30 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col IgniteFuture<Map<K, V>> fut; if (txFut == null || txFut.isDone()) { - if (reload && cctx.isStoreEnabled() && cctx.store().configured()) - fut = cache().reloadAllAsync(keys.keySet(), true, subjId, taskName, filters); - else - fut = tx == null ? cache().getDhtAllAsync(keys.keySet(), subjId, taskName, deserializePortable, filters) : - tx.getAllAsync(cctx, keys.keySet(), null, deserializePortable, filters); + if (reload && cctx.isStoreEnabled() && cctx.store().configured()) { + fut = cache().reloadAllAsync(keys.keySet(), + true, + subjId, + taskName, + filters); + } + else { + if (tx == null) { + fut = cache().getDhtAllAsync(keys.keySet(), + subjId, + taskName, + deserializePortable, + filters, + expiryPlc); + } + else { + fut = tx.getAllAsync(cctx, + keys.keySet(), + null, + deserializePortable, + filters); + } + } } else { // If we are here, then there were active transactions for some entries @@ -342,12 +370,30 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (e != null) throw new GridClosureException(e); - if (reload) - return cache().reloadAllAsync(keys.keySet(), true, subjId, taskName, filters); - else - return tx == null ? - cache().getDhtAllAsync(keys.keySet(), subjId, taskName, deserializePortable, filters) : - tx.getAllAsync(cctx, keys.keySet(), null, deserializePortable, filters); + if (reload && cctx.isStoreEnabled() && cctx.store().configured()) { + return cache().reloadAllAsync(keys.keySet(), + true, + subjId, + taskName, + filters); + } + else { + if (tx == null) { + return cache().getDhtAllAsync(keys.keySet(), + subjId, + taskName, + deserializePortable, + filters, + expiryPlc); + } + else { + return tx.getAllAsync(cctx, + keys.keySet(), + null, + deserializePortable, + filters); + } + } } }, cctx.kernalContext()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index a836ff8..ee989a5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -320,6 +320,20 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); + if (!tx.pessimistic()) { + int idx = 0; + + for (GridCacheTxEntry<K, V> e : dhtMapping.writes()) + req.ttl(idx++, e.ttl()); + + if (nearMapping != null) { + idx = 0; + + for (GridCacheTxEntry<K, V> e : nearMapping.writes()) + req.nearTtl(idx++, e.ttl()); + } + } + if (tx.onePhaseCommit()) req.writeVersion(tx.writeVersion()); @@ -377,6 +391,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); + if (!tx.pessimistic()) { + int idx = 0; + + for (GridCacheTxEntry<K, V> e : nearMapping.writes()) + req.nearTtl(idx++, e.ttl()); + } + if (tx.onePhaseCommit()) req.writeVersion(tx.writeVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index c5db862..f4946e9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -15,6 +15,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.util.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; @@ -74,6 +75,12 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest @GridDirectVersion(2) private int taskNameHash; + /** TTLs for optimistic transaction. */ + private GridLongList ttls; + + /** TTLs for optimistic transaction. */ + private GridLongList nearTtls; + /** * Empty constructor required for {@link Externalizable}. */ @@ -93,6 +100,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest * @param commit Commit flag. * @param invalidate Invalidate flag. * @param sysInvalidate System invalidation flag. + * @param syncCommit Synchronous commit flag. + * @param syncRollback Synchronous rollback flag. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -103,6 +112,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest * @param recoverWrites Recovery write entries. * @param onePhaseCommit One phase commit flag. * @param grpLockKey Group lock key. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. */ public GridDhtTxFinishRequest( UUID nearNodeId, @@ -242,6 +253,56 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; } + /** + * @param idx Entry index. + * @param ttl TTL. + */ + public void ttl(int idx, long ttl) { + if (ttl != -1L) { + if (ttls == null) { + ttls = new GridLongList(); + + for (int i = 0; i < idx - 1; i++) + ttls.add(-1L); + } + } + + if (ttls != null) + ttls.add(ttl); + } + + /** + * @return TTLs for optimistic transaction. + */ + public GridLongList ttls() { + return ttls; + } + + /** + * @param idx Entry index. + * @param ttl TTL. + */ + public void nearTtl(int idx, long ttl) { + if (ttl != -1L) { + if (nearTtls == null) { + nearTtls = new GridLongList(); + + for (int i = 0; i < idx - 1; i++) + nearTtls.add(-1L); + } + } + + if (nearTtls != null) + nearTtls.add(ttl); + } + + /** + * @return TTLs for optimistic transaction. + */ + public GridLongList nearTtls() { + return nearTtls; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { @@ -304,6 +365,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest _clone.writeVer = writeVer; _clone.subjId = subjId; _clone.taskNameHash = taskNameHash; + _clone.ttls = ttls; + _clone.nearTtls = nearTtls; } /** {@inheritDoc} */ @@ -430,6 +493,18 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; + case 31: + if (!commState.putLongList(ttls)) + return false; + + commState.idx++; + + case 32: + if (!commState.putLongList(nearTtls)) + return false; + + commState.idx++; + } return true; @@ -584,6 +659,26 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; + case 31: + GridLongList ttls0 = commState.getLongList(); + + if (ttls0 == LONG_LIST_NOT_READ) + return false; + + ttls = ttls0; + + commState.idx++; + + case 32: + GridLongList nearTtls0 = commState.getLongList(); + + if (nearTtls0 == LONG_LIST_NOT_READ) + return false; + + nearTtls = nearTtls0; + + commState.idx++; + } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java index 3bc41b2..2ac605b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -276,8 +276,13 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param drVer Data center replication version. * @param clos Transform closures. */ - public void addWrite(GridCacheContext<K, V> cacheCtx, GridCacheOperation op, GridCacheTxKey<K> key, byte[] keyBytes, - @Nullable V val, @Nullable byte[] valBytes, @Nullable Collection<IgniteClosure<V, V>> clos, + public void addWrite(GridCacheContext<K, V> cacheCtx, + GridCacheOperation op, + GridCacheTxKey<K> key, + byte[] keyBytes, + @Nullable V val, + @Nullable byte[] valBytes, + @Nullable Collection<IgniteClosure<V, V>> clos, @Nullable GridCacheVersion drVer) { checkInternal(key); @@ -286,7 +291,14 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), topologyVersion()); - GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached, drVer); + GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, + this, + op, + val, + -1L, + -1L, + cached, + drVer); txEntry.keyBytes(keyBytes); txEntry.valueBytes(valBytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 34539d4..4a3491d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -88,8 +88,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Whether to deserialize portable objects. */ private boolean deserializePortable; - /** */ - private GridCacheAccessExpiryPolicy expiry; + /** Expiry policy. */ + private GridCacheAccessExpiryPolicy expiryPlc; /** * Empty constructor required for {@link Externalizable}. @@ -104,8 +104,12 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param topVer Topology version. * @param reload Reload flag. * @param forcePrimary If {@code true} then will force network trip to primary node even - * if called on backup node. + * if called on backup node. * @param filters Filters. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, @@ -117,7 +121,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable GridCacheAccessExpiryPolicy expiry + @Nullable GridCacheAccessExpiryPolicy expiryPlc ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -132,7 +136,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M this.subjId = subjId; this.deserializePortable = deserializePortable; this.taskName = taskName; - this.expiry = expiry; + this.expiryPlc = expiryPlc; futId = IgniteUuid.randomUuid(); @@ -233,6 +237,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (trackable) cctx.mvcc().removeFuture(this); + cache().sendTtlUpdateRequest(expiryPlc); + return true; } @@ -304,7 +310,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M subjId, taskName == null ? 0 : taskName.hashCode(), deserializePortable, - filters); + filters, + expiryPlc); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -356,7 +363,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M filters, subjId, taskName == null ? 0 : taskName.hashCode(), - expiry != null ? expiry.ttl() : -1L); + expiryPlc != null ? expiryPlc.ttl() : -1L); add(fut); // Append new future. @@ -417,7 +424,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M null, taskName, filters, - null); + expiryPlc); colocated.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1d49097..3b07fc0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -275,10 +275,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, forcePrimary, filter, - expiryPlc, subjId0, taskName, - deserializePortable); + deserializePortable, + expiryPlc); } }); } @@ -705,20 +705,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param reload Reload flag. * @param forcePrimary Force primary flag. * @param filter Filter. - * @param expiryPlc Expiry policy. * @param subjId Subject ID. * @param taskName Task name. * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. * @return Get future. */ private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload, boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable ExpiryPolicy expiryPlc, UUID subjId, String taskName, - boolean deserializePortable) { + boolean deserializePortable, + @Nullable ExpiryPolicy expiryPlc) { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); if (F.isEmpty(keys)) @@ -813,32 +813,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (success) { - if (expiry != null && expiry.request() != null) { - ctx.closures().runLocalSafe(new Runnable() { - @Override public void run() { - try { - GridCacheTtlUpdateRequest<K, V> req = expiry.request(); - - assert req != null; - assert !F.isEmpty(req.keys()); - - Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1); - - req.cacheId(ctx.cacheId()); - - ctx.io().safeSend(nodes, req, null); - } - catch (IgniteCheckedException e) { - log.error("Failed to send TTL update request.", e); - } - } - }); - } + sendTtlUpdateRequest(expiry); return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); } } + if (expiry != null) + expiry.reset(); + // Either reload or not all values are available locally. GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 84f3165..6e9f921 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; @@ -169,9 +170,19 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); - subjId = ctx.subjectIdPerCall(subjId); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); - return loadAsync(keys, false, forcePrimary, topVer, subjId, taskName, deserializePortable, filter); + subjId = ctx.subjectIdPerCall(subjId, prj); + + return loadAsync(keys, + false, + forcePrimary, + topVer, + subjId, + taskName, + deserializePortable, + filter, + prj != null ? prj.expiry() : null); } /** {@inheritDoc} */ @@ -212,6 +223,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param taskName Task name. * @param deserializePortable Deserialize portable flag. * @param filter Filter. + * @param expiryPlc Expiry policy. * @return Loaded values. */ public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, @@ -221,13 +233,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable UUID subjId, String taskName, boolean deserializePortable, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable ExpiryPolicy expiryPlc) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); if (keyCheck) validateCacheKeys(keys); + final GridCacheAccessExpiryPolicy expiry = + GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f); @@ -258,7 +274,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte null, taskName, filter, - null); + expiry); // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -306,10 +322,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte break; } - if (success) + if (success) { + sendTtlUpdateRequest(expiry); + return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals)); + } } + if (expiry != null) + expiry.reset(); + // Either reload or not all values are available locally. GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, @@ -320,7 +342,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId, taskName, deserializePortable, - null); + expiry); fut.init(); @@ -332,9 +354,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * * {@inheritDoc} */ - @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable GridCacheTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval, - @Nullable GridCacheTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, + long timeout, + @Nullable GridCacheTxLocalEx<K, V> tx, + boolean isInvalidate, + boolean isRead, + boolean retval, + @Nullable GridCacheTxIsolation isolation, + IgnitePredicate<GridCacheEntry<K, V>>[] filter) { assert tx == null || tx instanceof GridNearTxLocal; GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java index db1a058..6c2fa8c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -248,9 +248,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param filter Filter. * @return Loaded values. */ - public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx, @Nullable Collection<? extends K> keys, - boolean reload, boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, - @Nullable UUID subjId, String taskName, boolean deserializePortable) { + public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx, + @Nullable Collection<? extends K> keys, + boolean reload, + boolean forcePrimary, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); @@ -259,6 +264,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null; + // TODO IGNITE-41. + GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, reload, @@ -267,7 +274,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda filter, subjId, taskName, - deserializePortable); + deserializePortable, + null); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index d23236a..b5d5e29 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -91,6 +90,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma /** Whether to deserialize portable objects. */ private boolean deserializePortable; + /** Expiry policy. */ + private GridCacheAccessExpiryPolicy expiryPlc; + /** * Empty constructor required for {@link Externalizable}. */ @@ -106,6 +108,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * called on backup node. * @param tx Transaction. * @param filters Filters. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. */ public GridNearGetFuture( GridCacheContext<K, V> cctx, @@ -116,11 +122,11 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters, @Nullable UUID subjId, String taskName, - boolean deserializePortable + boolean deserializePortable, + @Nullable GridCacheAccessExpiryPolicy expiryPlc ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); - assert cctx != null; assert !F.isEmpty(keys); this.cctx = cctx; @@ -132,6 +138,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma this.subjId = subjId; this.taskName = taskName; this.deserializePortable = deserializePortable; + this.expiryPlc = expiryPlc; futId = IgniteUuid.randomUuid(); @@ -292,8 +299,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma // If this is the primary or backup node for the keys. if (n.isLocal()) { final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut = - dht().getDhtAsync(n.id(), -1, mappedKeys, reload, topVer, subjId, - taskName == null ? 0 : taskName.hashCode(), deserializePortable, filters); + dht().getDhtAsync(n.id(), + -1, + mappedKeys, + reload, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + deserializePortable, + filters, + expiryPlc); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -351,7 +366,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma filters, subjId, taskName == null ? 0 : taskName.hashCode(), - -1L); + expiryPlc != null ? expiryPlc.ttl() : -1L); add(fut); // Append new future. @@ -406,7 +421,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma null, taskName, filters, - null); + expiryPlc); ClusterNode primary = null; @@ -432,7 +447,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma null, taskName, filters, - null); + expiryPlc); // Entry was not in memory or in swap, so we remove it from cache. if (v == null && isNew && entry.markObsoleteIfEmpty(ver)) @@ -549,10 +564,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @param keys Keys. * @param infos Entry infos. * @param savedVers Saved versions. + * @param topVer Topology version * @return Result map. */ - private Map<K, V> loadEntries(UUID nodeId, Collection<K> keys, Collection<GridCacheEntryInfo<K, V>> infos, - Map<K, GridCacheVersion> savedVers, long topVer) { + private Map<K, V> loadEntries(UUID nodeId, + Collection<K> keys, + Collection<GridCacheEntryInfo<K, V>> infos, + Map<K, GridCacheVersion> savedVers, + long topVer) { boolean empty = F.isEmpty(keys); Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java index 552012b..b6765fb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java @@ -90,6 +90,9 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements * @param reload Reload flag. * @param topVer Topology version. * @param filter Filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. */ public GridNearGetRequest( int cacheId, @@ -186,6 +189,13 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements } /** + * @return New TTL to set after entry is accessed, -1 to leave unchanged. + */ + public long accessTtl() { + return accessTtl; + } + + /** * @param ctx Cache context. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java index 9b4d117..c8d90e2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -119,12 +119,22 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @param filter Filter. * @return Future. */ - IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx, @Nullable Collection<? extends K> keys, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, boolean deserializePortable) { + IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx, + @Nullable Collection<? extends K> keys, + @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, + boolean deserializePortable) { assert tx != null; - GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, false, false, tx, filter, - CU.subjectId(tx, ctx.shared()), tx.resolveTaskName(), deserializePortable); + GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, + keys, + false, + false, + tx, + filter, + CU.subjectId(tx, ctx.shared()), + tx.resolveTaskName(), + deserializePortable, + null); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java index 2b974e9..13233c2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java @@ -286,7 +286,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { } else if (cacheCtx.isColocated()) { return cacheCtx.colocated().loadAsync(keys, /*reload*/false, /*force primary*/false, topologyVersion(), - CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null) + CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null, null) .chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() { @Override public Boolean apply(IgniteFuture<Map<K, V>> f) { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 30b7aef..0cc5001 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -75,6 +75,8 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ * @param txNodes Transaction nodes mapping. * @param last {@code True} if this last prepare request for node. * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. */ public GridNearTxPrepareRequest( IgniteUuid futId, @@ -202,6 +204,11 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ } /** {@inheritDoc} */ + @Override protected boolean transferExpiryPolicy() { + return true; + } + + /** {@inheritDoc} */ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) @Override public GridTcpCommunicationMessageAdapter clone() { GridNearTxPrepareRequest _clone = new GridNearTxPrepareRequest(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java index aa3546b..9cc8552 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java @@ -344,7 +344,13 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> return false; } else { - GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached, + GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, + this, + op, + val, + -1L, + -1L, + cached, drVer); txEntry.keyBytes(keyBytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 6eda650..d901311 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -573,16 +573,26 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { boolean deserializePortable) throws IgniteCheckedException { ctx.checkSecurity(GridSecurityPermission.CACHE_READ); - UUID subjId = ctx.subjectIdPerCall(null); - if (F.isEmpty(keys)) return Collections.emptyMap(); + GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall(); + + UUID subjId = ctx.subjectIdPerCall(null, prj); + + ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null; + + if (expiryPlc == null) + expiryPlc = ctx.expiry(); + Map<K, V> vals = new HashMap<>(keys.size(), 1.0f); if (keyCheck) validateCacheKeys(keys); + final GridCacheAccessExpiryPolicy expiry = + GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry()); + boolean success = true; for (K key : keys) { @@ -608,7 +618,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { null, taskName, filter, - null); + expiry); if (v != null) vals.put(key, v); @@ -653,7 +663,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { return map; } - return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, filter).get(); + return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java index 9293431..e1e54a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java @@ -18,6 +18,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.gridgain.grid.cache.*; import org.gridgain.testframework.junits.common.*; +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; import static org.gridgain.grid.cache.GridCacheMode.*; import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; @@ -89,6 +90,13 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { cfg.setSwapEnabled(swapEnabled()); cfg.setCacheMode(cacheMode()); cfg.setAtomicityMode(atomicityMode()); + + if (atomicityMode() == ATOMIC && cacheMode() != LOCAL) { + assert atomicWriteOrderMode() != null; + + cfg.setAtomicWriteOrderMode(atomicWriteOrderMode()); + } + cfg.setWriteSynchronizationMode(writeSynchronization()); cfg.setDistributionMode(distributionMode()); cfg.setPortableEnabled(portableEnabled()); @@ -110,6 +118,13 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { protected abstract GridCacheAtomicityMode atomicityMode(); /** + * @return Atomic cache write order mode. + */ + protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return null; + } + + /** * @return Partitioned mode. */ protected abstract GridCacheDistributionMode distributionMode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java index 251bc05..9e7dff5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java @@ -11,6 +11,7 @@ package org.apache.ignite.internal.processors.cache.expiry; import org.gridgain.grid.cache.*; +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; import static org.gridgain.grid.cache.GridCacheDistributionMode.*; import static org.gridgain.grid.cache.GridCacheMode.*; @@ -35,6 +36,11 @@ public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAb } /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return CLOCK; + } + + /** {@inheritDoc} */ @Override protected GridCacheDistributionMode distributionMode() { return PARTITIONED_ONLY; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java new file mode 100644 index 0000000..dec3a05 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java @@ -0,0 +1,24 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.gridgain.grid.cache.*; + +import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*; + +/** + * + */ +public class IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest { + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } +}
