Repository: ignite Updated Branches: refs/heads/ignite-single-op-tx [created] f09d09fe5
'Single' operations optimizations for tx cache. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f09d09fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f09d09fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f09d09fe Branch: refs/heads/ignite-single-op-tx Commit: f09d09fe5e6f29d4bd10090db6247f219f780954 Parents: 5887ae4 Author: sboikov <[email protected]> Authored: Fri Nov 13 14:10:44 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 13 14:10:44 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 2 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 7 +- .../transactions/IgniteTxLocalAdapter.java | 1225 +++++++++++------- .../cache/transactions/IgniteTxLocalEx.java | 15 + .../cache/transactions/IgniteTxMap.java | 3 +- 5 files changed, 765 insertions(+), 487 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 419ccec..49ca1dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1948,7 +1948,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, filter).get().success(); + return tx.putAsync(ctx, key, val, false, filter).get().success(); } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 6de8795..0869b90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -705,7 +705,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { passedKeys, read, needRetVal, - skipped, accessTtl, null, skipStore); @@ -723,7 +722,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param passedKeys Passed keys. * @param read {@code True} if read. * @param needRetVal Return value flag. - * @param skipped Skipped keys. * @param accessTtl TTL for read operation. * @param filter Entry write filter. * @param skipStore Skip store flag. @@ -735,13 +733,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { final Collection<KeyCacheObject> passedKeys, final boolean read, final boolean needRetVal, - final Set<KeyCacheObject> skipped, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, boolean skipStore) { if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + - skipped + ']'); + log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']'); if (passedKeys.isEmpty()) return new GridFinishedFuture<>(ret); @@ -768,7 +764,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { postLockWrite(cacheCtx, passedKeys, - skipped, ret, /*remove*/false, /*retval*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 2c7bf8a..ada2538 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1582,25 +1582,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** - * Adds skipped key. - * - * @param skipped Skipped set (possibly {@code null}). - * @param key Key to add. - * @return Skipped set. - */ - private Set<KeyCacheObject> skip(Set<KeyCacheObject> skipped, KeyCacheObject key) { - if (skipped == null) - skipped = new GridLeanSet<>(); - - skipped.add(key); - - if (log.isDebugEnabled()) - log.debug("Added key to skipped set: " + key); - - return skipped; - } - - /** * Loads all missed keys for * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. * @@ -1954,6 +1935,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ + @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( + GridCacheContext cacheCtx, + K key, + V val, + boolean retval, + CacheEntryPredicate[] filter) { + return putAsync0(cacheCtx, key, val, retval, filter); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext cacheCtx, Map<KeyCacheObject, GridCacheDrInfo> drMap @@ -2009,12 +2000,85 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param cacheKey Key to enlist. + * @param val Value. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param entryProcessor Entry processor (for invoke operation). + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Flag indicating whether a value should be returned. + * @param lockOnly If {@code true}, then entry will be enlisted as noop. + * @param filter User filters. + * @param ret Return value. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @return Future for entry values loading. + */ + private <K, V> IgniteInternalFuture<Void> enlistWrite( + final GridCacheContext cacheCtx, + KeyCacheObject cacheKey, + Object val, + @Nullable ExpiryPolicy expiryPlc, + @Nullable EntryProcessor<K, V, Object> entryProcessor, + @Nullable Object[] invokeArgs, + final boolean retval, + boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + boolean skipStore, + final boolean singleRmv) { + try { + addActiveCache(cacheCtx); + + final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + final boolean needVal = singleRmv || retval || hasFilters; + final boolean needReadVer = needVal && (serializable() && optimistic()); + + boolean loadMissed = enlistWriteEntry(cacheCtx, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + /*drVer*/null, + /*drTtl*/-1L, + /*drExpireTime*/-1L, + ret, + /*enlisted*/null, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer); + + if (loadMissed) { + return loadMissing(cacheCtx, + Collections.singleton(cacheKey), + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + skipStore, + retval); + } + + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + /** * Internal routine for <tt>putAll(..)</tt> * * @param cacheCtx Cache context. * @param keys Keys to enlist. * @param expiryPlc Explicitly specified expiry policy for entry. - * @param implicit Implicit flag. * @param lookup Value lookup map ({@code null} for remove). * @param invokeMap Map with entry processors for invoke operation. * @param invokeArgs Optional arguments for EntryProcessor. @@ -2027,13 +2091,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param drRmvMap DR remove map (optional). * @param skipStore Skip store flag. * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. - * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). + * @return Future for missing values loading. */ - private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite( + private <K, V> IgniteInternalFuture<Void> enlistWrite( final GridCacheContext cacheCtx, Collection<?> keys, @Nullable ExpiryPolicy expiryPlc, - boolean implicit, @Nullable Map<?, ?> lookup, @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap, @Nullable Object[] invokeArgs, @@ -2056,8 +2119,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return new GridFinishedFuture<>(e); } - Set<KeyCacheObject> skipped = null; - boolean rmv = lookup == null && invokeMap == null; Set<KeyCacheObject> missedForLoad = null; @@ -2115,345 +2176,441 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); - IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - - IgniteTxEntry txEntry = entry(txKey); - - // First time access. - if (txEntry == null) { - while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); - - try { - entry.unswap(false); - - // Check if lock is being explicitly acquired by the same thread. - if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && - entry.lockedByThread(threadId, xidVer)) - throw new IgniteCheckedException("Cannot access key within transaction if lock is " + - "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer + - ", threadId=" + threadId + - ", locNodeId=" + cctx.localNodeId() + ']'); - - CacheObject old = null; - GridCacheVersion readVer = null; + boolean loadMissed = enlistWriteEntry(cacheCtx, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + drVer, + drTtl, + drExpireTime, + ret, + enlisted, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer); + + if (loadMissed) { + if (missedForLoad == null) + missedForLoad = new HashSet<>(); + + missedForLoad.add(cacheKey); + } + } - if (optimistic() && !implicit()) { - try { - if (needReadVer) { - T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? - entry.innerGetVersioned(this, - /*swap*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null) : null; + if (missedForLoad != null) { + return loadMissing(cacheCtx, + missedForLoad, + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + skipStore, + retval); + } - if (res != null) { - old = res.get1(); - readVer = res.get2(); - } - } - else { - old = entry.innerGet(this, - /*swap*/false, - /*read-through*/false, - /*fail-fast*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null); - } - } - catch (ClusterTopologyCheckedException e) { - entry.context().evicts().touch(entry, topologyVersion()); + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } - throw e; - } - } - else - old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + /** + * @param cacheCtx Cache context. + * @param keys Keys to load. + * @param ret Return value. + * @param needReadVer Read version flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param skipStore Skip store flag. + * @param retval Return value flag. + * @return Load future. + */ + private IgniteInternalFuture<Void> loadMissing( + final GridCacheContext cacheCtx, + final Set<KeyCacheObject> keys, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + final boolean needReadVer, + final boolean singleRmv, + final boolean hasFilters, + final boolean skipStore, + final boolean retval) { + GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = + new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); - if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { - skipped = skip(skipped, cacheKey); + IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); - ret.set(cacheCtx, old, false); + assert e != null; - if (!readCommitted()) { - // Enlist failed filters as reads for non-read-committed mode, - // so future ops will get the same values. - txEntry = addEntry(READ, - old, - null, - null, - entry, - null, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore); + if (needReadVer) { + assert loadVer != null; - txEntry.markValid(); + e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } - if (needReadVer) { - assert readVer != null; + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } - } + ret.set(cacheCtx, null, val != null); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - if (readCommitted()) - cacheCtx.evicts().touch(entry, topologyVersion()); + if (e.op() == TRANSFORM) { + GridCacheVersion ver; - break; // While. + try { + ver = e.cached().version(); } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; - final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore); + ver = null; + } - if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) - cacheCtx.evicts().touch(entry, topologyVersion()); + addInvokeResult(e, cacheVal, ret, ver); + } + else { + boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); - enlisted.add(cacheKey); + ret.set(cacheCtx, cacheVal, success); + } + } + } + }; - if (!pessimistic() && !implicit()) { - txEntry.markValid(); + return loadMissing( + cacheCtx, + /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, + /*async*/true, + keys, + /*skipVals*/singleRmv, + needReadVer, + c); + } - if (old == null) { - if (needVal) { - if (missedForLoad == null) - missedForLoad = new HashSet<>(); + /** + * @param cacheCtx Cache context. + * @param cacheKey Key. + * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param retval Return value flag. + * @param lockOnly + * @param filter Filter. + * @param drVer DR version. + * @param drTtl DR ttl. + * @param drExpireTime DR expire time. + * @param ret Return value. + * @param enlisted Enlisted keys collection. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param needVal {@code True} if value is needed. + * @param needReadVer {@code True} if need read entry version. + * @return {@code True} if entry value should be loaded. + * @throws IgniteCheckedException If failed. + */ + private boolean enlistWriteEntry(GridCacheContext cacheCtx, + final KeyCacheObject cacheKey, + final @Nullable Object val, + final @Nullable EntryProcessor<?, ?, ?> entryProcessor, + final @Nullable Object[] invokeArgs, + final @Nullable ExpiryPolicy expiryPlc, + final boolean retval, + final boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheVersion drVer, + final long drTtl, + long drExpireTime, + final GridCacheReturn ret, + @Nullable final Collection<KeyCacheObject> enlisted, + boolean skipStore, + boolean singleRmv, + boolean hasFilters, + final boolean needVal, + boolean needReadVer + ) throws IgniteCheckedException { + boolean loadMissed = false; - missedForLoad.add(cacheKey); - } - else { - assert !implicit() || !transform : this; - assert txEntry.op() != TRANSFORM : txEntry; + final boolean rmv = val == null && entryProcessor == null; - if (retval) - ret.set(cacheCtx, null, true); - else - ret.success(true); - } - } - else { - if (needReadVer) { - assert readVer != null; + IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } + IgniteTxEntry txEntry = entry(txKey); - if (retval && !transform) - ret.set(cacheCtx, old, true); - else { - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; + // First time access. + if (txEntry == null) { + while (true) { + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : txEntry; + try { + entry.unswap(false); + + // Check if lock is being explicitly acquired by the same thread. + if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && + entry.lockedByThread(threadId, xidVer)) { + throw new IgniteCheckedException("Cannot access key within transaction if lock is " + + "externally held [key=" + CU.value(cacheKey, cacheCtx, false) + + ", entry=" + entry + + ", xidVer=" + xidVer + + ", threadId=" + threadId + + ", locNodeId=" + cctx.localNodeId() + ']'); + } - if (log.isDebugEnabled()) - log.debug("Failed to get entry version " + - "[err=" + ex.getMessage() + ']'); + CacheObject old = null; + GridCacheVersion readVer = null; - ver = null; - } + if (optimistic() && !implicit()) { + try { + if (needReadVer) { + T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null) : null; - addInvokeResult(txEntry, old, ret, ver); - } - else - ret.success(true); - } + if (res != null) { + old = res.get1(); + readVer = res.get2(); } } - // Pessimistic. else { - if (retval && !transform) - ret.set(cacheCtx, old, true); - else - ret.success(true); + old = entry.innerGet(this, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null); } - - break; // While. } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction putAll0 method: " + entry); + catch (ClusterTopologyCheckedException e) { + entry.context().evicts().touch(entry, topologyVersion()); + + throw e; } } - } - else { - if (entryProcessor == null && txEntry.op() == TRANSFORM) - throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after EntryProcessor is applied): " + key); + else + old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - GridCacheEntryEx entry = txEntry.cached(); + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { + ret.set(cacheCtx, old, false); - CacheObject v = txEntry.value(); - - boolean del = txEntry.op() == DELETE && rmv; + if (!readCommitted()) { + // Enlist failed filters as reads for non-read-committed mode, + // so future ops will get the same values. + txEntry = addEntry(READ, + old, + null, + null, + entry, + null, + CU.empty0(), + false, + -1L, + -1L, + null, + skipStore); - if (!del) { - if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { - skipped = skip(skipped, cacheKey); + txEntry.markValid(); - ret.set(cacheCtx, v, false); + if (needReadVer) { + assert readVer != null; - continue; + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } } - GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : - v != null ? UPDATE : CREATE; + if (readCommitted()) + cacheCtx.evicts().touch(entry, topologyVersion()); - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore); + break; // While. + } + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore); + + if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) + cacheCtx.evicts().touch(entry, topologyVersion()); + + if (enlisted != null) enlisted.add(cacheKey); - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; + if (!pessimistic() && !implicit()) { + txEntry.markValid(); - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; + if (old == null) { + if (needVal) + loadMissed = true; + else { + assert !implicit() || !transform : this; + assert txEntry.op() != TRANSFORM : txEntry; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + if (retval) + ret.set(cacheCtx, null, true); + else + ret.success(true); + } + } + else { + if (needReadVer) { + assert readVer != null; - ver = null; + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); } - addInvokeResult(txEntry, txEntry.value(), ret, ver); - } - } + if (retval && !transform) + ret.set(cacheCtx, old, true); + else { + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; - if (!pessimistic()) { - txEntry.markValid(); + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : txEntry; + + if (log.isDebugEnabled()) + log.debug("Failed to get entry version " + + "[err=" + ex.getMessage() + ']'); + ver = null; + } + + addInvokeResult(txEntry, old, ret, ver); + } + else + ret.success(true); + } + } + } + // Pessimistic. + else { if (retval && !transform) - ret.set(cacheCtx, v, true); + ret.set(cacheCtx, old, true); else ret.success(true); } + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction putAll0 method: " + entry); } } } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } + else { + if (entryProcessor == null && txEntry.op() == TRANSFORM) + throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + + "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false)); - if (missedForLoad != null) { - final boolean skipVals = singleRmv; + GridCacheEntryEx entry = txEntry.cached(); - IgniteInternalFuture<Void> fut = loadMissing( - cacheCtx, - /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, - /*async*/true, - missedForLoad, - skipVals, - needReadVer, - new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { - @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); + CacheObject v = txEntry.value(); - IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); + boolean del = txEntry.op() == DELETE && rmv; - assert e != null; + if (!del) { + if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { + ret.set(cacheCtx, v, false); - if (needReadVer) { - assert loadVer != null; + return loadMissed; + } - e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); - } + GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : + v != null ? UPDATE : CREATE; - if (singleRmv) { - assert !hasFilters && !retval; - assert val == null || Boolean.TRUE.equals(val) : val; + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore); - ret.set(cacheCtx, null, val != null); - } - else { - CacheObject cacheVal = cacheCtx.toCacheObject(val); + if (enlisted != null) + enlisted.add(cacheKey); - if (e.op() == TRANSFORM) { - GridCacheVersion ver; + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; - try { - ver = e.cached().version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : e; + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - ver = null; - } + ver = null; + } - addInvokeResult(e, cacheVal, ret, ver); - } - else { - boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); + addInvokeResult(txEntry, txEntry.value(), ret, ver); + } + } - ret.set(cacheCtx, cacheVal, success); - } - } - } - }); - - return new GridEmbeddedFuture<>( - new C2<Void, Exception, Set<KeyCacheObject>>() { - @Override public Set<KeyCacheObject> apply(Void b, Exception e) { - if (e != null) - throw new GridClosureException(e); + if (!pessimistic()) { + txEntry.markValid(); - return Collections.emptySet(); - } - }, fut - ); + if (retval && !transform) + ret.set(cacheCtx, v, true); + else + ret.success(true); + } } - return new GridFinishedFuture<>(skipped); + return loadMissed; } /** @@ -2486,22 +2643,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * * @param cacheCtx Context. * @param keys Keys. - * @param failed Collection of potentially failed keys (need to populate in this method). * @param ret Return value. * @param rmv {@code True} if remove. * @param retval Flag to return value or not. * @param read {@code True} if read. * @param accessTtl TTL for read operation. * @param filter Filter to check entries. - * @return Failed keys. * @throws IgniteCheckedException If error. * @param computeInvoke If {@code true} computes return value for invoke operation. */ @SuppressWarnings("unchecked") - protected Set<KeyCacheObject> postLockWrite( + protected final void postLockWrite( GridCacheContext cacheCtx, Iterable<KeyCacheObject> keys, - Set<KeyCacheObject> failed, GridCacheReturn ret, boolean rmv, boolean retval, @@ -2606,8 +2760,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter log.debug("Filter passed in post lock for key: " + k); } else { - failed = skip(failed, k); - // Revert operation to previous. (if no - NOOP, so entry will be unlocked). txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value())); txEntry.filters(CU.empty0()); @@ -2638,11 +2790,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } } - - if (log.isDebugEnabled()) - log.debug("Entries that failed after lock filter check: " + failed); - - return failed; } /** @@ -2696,6 +2843,140 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param retval Return value flag. + * @throws IgniteCheckedException If failed. + */ + private void beforePut(GridCacheContext cacheCtx, boolean retval) throws IgniteCheckedException { + checkUpdatesAllowed(cacheCtx); + + cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT); + + if (retval) + needReturnValue(true); + + checkValid(); + + init(); + } + + /** + * Internal method for single update operation. + * + * @param cacheCtx Cache context. + * @param key Key. + * @param val Value. + * @param retval Return value flag. + * @param filter Filter. + * @return Operation future. + */ + private <K, V> IgniteInternalFuture putAsync0( + final GridCacheContext cacheCtx, + K key, + V val, + final boolean retval, + @Nullable final CacheEntryPredicate[] filter + ) { + assert key != null; + + try { + beforePut(cacheCtx, retval); + + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); + + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); + + final IgniteInternalFuture<Void> loadFut = enlistWrite( + cacheCtx, + cacheKey, + val, + opCtx != null ? opCtx.expiry() : null, + null, + null, + retval, + /*lockOnly*/false, + filter, + ret, + opCtx != null && opCtx.skipStore(), + /*singleRmv*/false); + + if (pessimistic()) { + assert loadFut == null || loadFut.isDone() : loadFut; + + final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey); + + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for put on key: " + enlisted); + + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L); + + PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { + @Override public GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for put on keys: " + enlisted); + + postLockWrite(cacheCtx, + enlisted, + ret, + /*remove*/false, + retval, + /*read*/false, + -1L, + filter, + /*computeInvoke*/true); + + return ret; + } + }; + + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return nonInterruptable(plc1.apply(false, e)); + } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); + } + } + } + else { + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + } + else + return optimisticPutFuture(loadFut, ret); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + catch (RuntimeException e) { + onException(); + + throw e; + } + } + + /** * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} * maps must be non-null. * @@ -2721,17 +3002,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert filter == null || invokeMap == null; try { - checkUpdatesAllowed(cacheCtx); + beforePut(cacheCtx, retval); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); } - cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT); - - if (retval) - needReturnValue(true); - // Cached entry may be passed only from entry wrapper. final Map<?, ?> map0; final Map<?, EntryProcessor<K, V, Object>> invokeMap0; @@ -2757,15 +3033,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert map0 != null || invokeMap0 != null; - try { - checkValid(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - - init(); - final GridCacheReturn ret = new GridCacheReturn(localResult(), false); if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { @@ -2783,15 +3050,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter try { Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); - Collection<KeyCacheObject> enlisted = new ArrayList<>(); + final Collection<KeyCacheObject> enlisted = new ArrayList<>(); CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite( + final IgniteInternalFuture<Void> loadFut = enlistWrite( cacheCtx, keySet, opCtx != null ? opCtx.expiry() : null, - implicit, map0, invokeMap0, invokeArgs, @@ -2806,15 +3072,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter false); if (pessimistic()) { - // Loose all skipped. - final Set<KeyCacheObject> loaded = loadFut.get(); - - final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded)); + assert loadFut == null || loadFut.isDone() : loadFut; if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock for put on keys: " + keys); + log.debug("Before acquiring transaction lock for put on keys: " + enlisted); - IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, lockTimeout(), this, false, @@ -2828,11 +3091,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throws IgniteCheckedException { if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for put on keys: " + keys); + log.debug("Acquired transaction lock for put on keys: " + enlisted); postLockWrite(cacheCtx, - keys, - loaded, + enlisted, ret, /*remove*/false, retval, @@ -2861,64 +3123,79 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } } - else + else { return nonInterruptable(new GridEmbeddedFuture<>( fut, plc1 )); + } } - else { - if (implicit()) { - // Should never load missing values for implicit transaction as values will be returned - // with prepare response, if required. - assert loadFut.isDone(); + else + return optimisticPutFuture(loadFut, ret); + } + catch (RuntimeException e) { + onException(); - try { - loadFut.get(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } + throw e; + } + } - return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { - try { - txFut.get(); + /** + * @param loadFut Missing keys load future. + * @param ret Future result. + * @return Future. + */ + private IgniteInternalFuture optimisticPutFuture(IgniteInternalFuture<Void> loadFut, final GridCacheReturn ret) { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return implicitRes; - } - catch (IgniteCheckedException | RuntimeException e) { - rollbackAsync(); + try { + loadFut.get(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } - throw e; - } + return nonInterruptable(commitAsync().chain( + new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) + throws IgniteCheckedException { + try { + txFut.get(); + + return implicitRes; } - })); - } - else - return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException { - f.get(); + catch (IgniteCheckedException | RuntimeException e) { + rollbackAsync(); - return ret; + throw e; } - })); - } + } + } + )); } - catch (RuntimeException e) { - for (IgniteTxEntry txEntry : txMap.values()) { - GridCacheEntryEx cached0 = txEntry.cached(); - - if (cached0 != null) - txEntry.context().evicts().touch(cached0, topologyVersion()); - } + else { + return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) throws IgniteCheckedException { + f.get(); - throw e; + return ret; + } + })); } - catch (IgniteCheckedException e) { - setRollbackOnly(); + } - return new GridFinishedFuture<>(e); + /** + * + */ + private void onException() { + for (IgniteTxEntry txEntry : txMap.values()) { + GridCacheEntryEx cached0 = txEntry.cached(); + + if (cached0 != null) + txEntry.context().evicts().touch(cached0, topologyVersion()); } } @@ -2974,9 +3251,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert keys0 != null; - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit + ", retval=" + retval + "]"); + } try { checkValid(); @@ -3002,140 +3280,131 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter init(); - try { - Collection<KeyCacheObject> enlisted = new ArrayList<>(); + final Collection<KeyCacheObject> enlisted = new ArrayList<>(); - CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - ExpiryPolicy plc; + ExpiryPolicy plc; - if (!F.isEmpty(filter)) - plc = opCtx != null ? opCtx.expiry() : null; - else - plc = null; + if (!F.isEmpty(filter)) + plc = opCtx != null ? opCtx.expiry() : null; + else + plc = null; - final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite( - cacheCtx, - keys0, - plc, - implicit, - /** lookup map */null, - /** invoke map */null, - /** invoke arguments */null, - retval, - /** lock only */false, - filter, - ret, - enlisted, - null, - drMap, - opCtx != null && opCtx.skipStore(), - singleRmv - ); + final IgniteInternalFuture<Void> loadFut = enlistWrite( + cacheCtx, + keys0, + plc, + /** lookup map */null, + /** invoke map */null, + /** invoke arguments */null, + retval, + /** lock only */false, + filter, + ret, + enlisted, + null, + drMap, + opCtx != null && opCtx.skipStore(), + singleRmv + ); - if (log.isDebugEnabled()) - log.debug("Remove keys: " + enlisted); + if (log.isDebugEnabled()) + log.debug("Remove keys: " + enlisted); - // Acquire locks only after having added operation to the write set. - // Otherwise, during rollback we will not know whether locks need - // to be rolled back. - if (pessimistic()) { - // Loose all skipped. - final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get())); + // Acquire locks only after having added operation to the write set. + // Otherwise, during rollback we will not know whether locks need + // to be rolled back. + if (pessimistic()) { + assert loadFut.isDone() : loadFut; - if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys); - - IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, - lockTimeout(), - this, - false, - retval, - isolation, - isInvalidate(), - -1L); + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for remove on keys: " + enlisted); - PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { - @Override protected GridCacheReturn postLock(GridCacheReturn ret) - throws IgniteCheckedException - { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for remove on keys: " + passedKeys); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L); + + PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for remove on keys: " + enlisted); - postLockWrite(cacheCtx, - passedKeys, - loadFut.get(), - ret, + postLockWrite(cacheCtx, + enlisted, + ret, /*remove*/true, - retval, + retval, /*read*/false, - -1L, - filter, + -1L, + filter, /*computeInvoke*/false); - return ret; - } - }; + return ret; + } + }; - if (fut.isDone()) { + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { try { - return nonInterruptable(plc1.apply(fut.get(), null)); - } - catch (GridClosureException e) { - return new GridFinishedFuture<>(e.unwrap()); + return nonInterruptable(plc1.apply(false, e)); } - catch (IgniteCheckedException e) { - try { - return nonInterruptable(plc1.apply(false, e)); - } - catch (Exception e1) { - return new GridFinishedFuture<>(e1); - } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); } } - else - return nonInterruptable(new GridEmbeddedFuture<>( - fut, - plc1 - )); } - else { - if (implicit()) { - // Should never load missing values for implicit transaction as values will be returned - // with prepare response, if required. - assert loadFut.isDone(); - - return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) - throws IgniteCheckedException { - try { - txFut.get(); + else + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + else { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return implicitRes; - } - catch (IgniteCheckedException | RuntimeException e) { - rollbackAsync(); + return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) + throws IgniteCheckedException { + try { + txFut.get(); - throw e; - } + return implicitRes; } - })); - } - else - return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) - throws IgniteCheckedException { - f.get(); + catch (IgniteCheckedException | RuntimeException e) { + rollbackAsync(); - return ret; + throw e; } - })); + } + })); } - } - catch (IgniteCheckedException e) { - setRollbackOnly(); + else { + return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) + throws IgniteCheckedException { + f.get(); - return new GridFinishedFuture<>(e); + return ret; + } + })); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 0d83338..f9555cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -93,6 +93,21 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { /** * @param cacheCtx Cache context. + * @param key Key. + * @param val Value. + * @param retval Return value flag. + * @param filter Filter. + * @return Future for put operation. + */ + public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( + GridCacheContext cacheCtx, + K key, + V val, + boolean retval, + CacheEntryPredicate[] filter); + + /** + * @param cacheCtx Cache context. * @param map Entry processors map. * @param invokeArgs Optional arguments for entry processor. * @return Transform operation future. http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java index 6408573..429c995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java @@ -170,8 +170,7 @@ public class IgniteTxMap extends AbstractMap<IgniteTxKey, IgniteTxEntry> impleme } /** {@inheritDoc} */ - @Nullable - @Override public IgniteTxEntry get(Object key) { + @Nullable @Override public IgniteTxEntry get(Object key) { IgniteTxEntry e = txMap.get(key); return e == null ? null : filter.apply(e) ? e : null;
