Repository: ignite Updated Branches: refs/heads/ignite-1607 59bf1a2e6 -> 183c19697
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/183c1969 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/183c1969 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/183c1969 Branch: refs/heads/ignite-1607 Commit: 183c19697056a8b2e3916dc044b9c8d656884834 Parents: 59bf1a2 Author: sboikov <[email protected]> Authored: Wed Oct 14 10:26:36 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 14 15:02:40 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 51 ++++------ .../processors/cache/GridCacheEntryEx.java | 3 +- .../dht/CacheDistributedGetFutureAdapter.java | 2 +- .../cache/distributed/dht/GridDhtGetFuture.java | 83 ++++++++------- .../dht/GridDhtTransactionalCacheAdapter.java | 101 +++++++++---------- .../distributed/dht/GridDhtTxPrepareFuture.java | 10 +- .../dht/GridPartitionedGetFuture.java | 8 -- .../dht/atomic/GridDhtAtomicCache.java | 4 - .../dht/colocated/GridDhtColocatedCache.java | 4 - .../distributed/near/GridNearGetFuture.java | 6 -- ...arOptimisticSerializableTxPrepareFuture.java | 4 +- .../local/atomic/GridLocalAtomicCache.java | 7 -- .../cache/transactions/IgniteTxAdapter.java | 95 ++++++++--------- .../transactions/IgniteTxLocalAdapter.java | 93 +++-------------- .../CacheSerializableTransactionsTest.java | 38 +++++-- .../cache/GridCacheAbstractFullApiSelfTest.java | 4 +- .../IgniteCacheCrossCacheTxFailoverTest.java | 13 ++- 17 files changed, 224 insertions(+), 302 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 6e5296a..8d2d8e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4649,41 +4649,34 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V boolean deserializePortable) throws IgniteCheckedException, GridCacheEntryRemovedException { - try { - CacheObject val = entry.innerGet( - null, - false, - false, - false, - true, - false, - false, - false, - null, - null, - null, - null); - - if (val == null) - return null; + CacheObject val = entry.innerGet( + null, + false, + false, + false, + true, + false, + false, + false, + null, + null, + null, + null); - KeyCacheObject key = entry.key(); + if (val == null) + return null; - Object key0 = key.value(ctx.cacheObjectContext(), true); - Object val0 = val.value(ctx.cacheObjectContext(), true); + KeyCacheObject key = entry.key(); - if (deserializePortable) { - key0 = ctx.unwrapPortableIfNeeded(key0, true); - val0 = ctx.unwrapPortableIfNeeded(val0, true); - } + Object key0 = key.value(ctx.cacheObjectContext(), true); + Object val0 = val.value(ctx.cacheObjectContext(), true); - return new CacheEntryImpl<>((K)key0, (V)val0, entry.version()); + if (deserializePortable) { + key0 = ctx.unwrapPortableIfNeeded(key0, true); + val0 = ctx.unwrapPortableIfNeeded(val0, true); } - catch (GridCacheFilterFailedException ignore) { - assert false; - return null; - } + return new CacheEntryImpl<>((K)key0, (V)val0, entry.version()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index c642b55..ebbc736 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -292,7 +292,6 @@ public interface GridCacheEntryEx { * @return Cached value. * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. - * @throws GridCacheFilterFailedException If filter failed. */ @Nullable public CacheObject innerGet(@Nullable IgniteInternalTx tx, boolean readSwap, @@ -306,7 +305,7 @@ public interface GridCacheEntryEx { Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc) - throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; + throws IgniteCheckedException, GridCacheEntryRemovedException; /** * @param tx Cache transaction. http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 05f753c..4989a50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -168,7 +168,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun rdc.collect(key); - resC.apply(key, val, ver); + resC.apply(key, skipVals ? true : val, ver); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 1e722ba..c70440e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -294,56 +295,60 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col GridCompoundFuture<Boolean, Boolean> txFut = null; - for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) { - while (true) { - GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer); + ClusterNode readerNode = cctx.discovery().node(reader); - try { - GridCacheEntryInfo info = e.info(); + if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) { + for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) { + while (true) { + GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer); - // If entry is obsolete. - if (info == null) - continue; + try { + GridCacheEntryInfo info = e.info(); - boolean addReader = (!e.deleted() && k.getValue() && !skipVals); + // If entry is obsolete. + if (info == null) + continue; - if (addReader) - e.unswap(false); + boolean addReader = (!e.deleted() && k.getValue() && !skipVals); - // Register reader. If there are active transactions for this entry, - // then will wait for their completion before proceeding. - // TODO: GG-4003: - // TODO: What if any transaction we wait for actually removes this entry? - // TODO: In this case seems like we will be stuck with untracked near entry. - // TODO: To fix, check that reader is contained in the list of readers once - // TODO: again after the returned future completes - if not, try again. - // TODO: Also, why is info read before transactions are complete, and not after? - IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null; + if (addReader) + e.unswap(false); - if (f != null) { - if (txFut == null) - txFut = new GridCompoundFuture<>(CU.boolReducer()); + // Register reader. If there are active transactions for this entry, + // then will wait for their completion before proceeding. + // TODO: GG-4003: + // TODO: What if any transaction we wait for actually removes this entry? + // TODO: In this case seems like we will be stuck with untracked near entry. + // TODO: To fix, check that reader is contained in the list of readers once + // TODO: again after the returned future completes - if not, try again. + // TODO: Also, why is info read before transactions are complete, and not after? + IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null; - txFut.add(f); - } + if (f != null) { + if (txFut == null) + txFut = new GridCompoundFuture<>(CU.boolReducer()); - break; - } - catch (IgniteCheckedException err) { - return new GridFinishedFuture<>(err); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when getting a DHT value: " + e); - } - finally { - cctx.evicts().touch(e, topVer); + txFut.add(f); + } + + break; + } + catch (IgniteCheckedException err) { + return new GridFinishedFuture<>(err); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when getting a DHT value: " + e); + } + finally { + cctx.evicts().touch(e, topVer); + } } } - } - if (txFut != null) - txFut.markInitialized(); + if (txFut != null) + txFut.markInitialized(); + } IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 501cf27..45adf75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1103,62 +1103,55 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (tx == null || !tx.isRollbackOnly()) { GridCacheVersion dhtVer = req.dhtVersion(i); - try { - GridCacheVersion ver = e.version(); - - boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); - - CacheObject val = null; - - if (ret) - val = e.innerGet(tx, - /*swap*/true, - /*read-through*/false, - /*fail-fast.*/false, - /*unmarshal*/false, - /*update-metrics*/true, - /*event notification*/req.returnValue(i), - /*temporary*/false, - CU.subjectId(tx, ctx.shared()), - null, - tx != null ? tx.resolveTaskName() : null, - null); - - assert e.lockedBy(mappedVer) || - (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) : - "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() + - ", entry=" + e + - ", mappedVer=" + mappedVer + ", ver=" + ver + - ", tx=" + tx + ", req=" + req + - ", err=" + err + ']'; - - boolean filterPassed = false; - - if (tx != null && tx.onePhaseCommit()) { - IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key())); - - assert writeEntry != null : - "Missing tx entry for locked cache entry: " + e; - - filterPassed = writeEntry.filtersPassed(); - } - - if (ret && val == null) - val = e.valueBytes(null); - - // We include values into response since they are required for local - // calls and won't be serialized. We are also including DHT version. - res.addValueBytes( - ret ? val : null, - filterPassed, - ver, - mappedVer); + GridCacheVersion ver = e.version(); + + boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); + + CacheObject val = null; + + if (ret) + val = e.innerGet(tx, + /*swap*/true, + /*read-through*/false, + /*fail-fast.*/false, + /*unmarshal*/false, + /*update-metrics*/true, + /*event notification*/req.returnValue(i), + /*temporary*/false, + CU.subjectId(tx, ctx.shared()), + null, + tx != null ? tx.resolveTaskName() : null, + null); + + assert e.lockedBy(mappedVer) || + (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) : + "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() + + ", entry=" + e + + ", mappedVer=" + mappedVer + ", ver=" + ver + + ", tx=" + tx + ", req=" + req + + ", err=" + err + ']'; + + boolean filterPassed = false; + + if (tx != null && tx.onePhaseCommit()) { + IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key())); + + assert writeEntry != null : + "Missing tx entry for locked cache entry: " + e; + + filterPassed = writeEntry.filtersPassed(); } - catch (GridCacheFilterFailedException ex) { - assert false : "Filter should never fail if fail-fast is false."; - ex.printStackTrace(); - } + if (ret && val == null) + val = e.valueBytes(null); + + // We include values into response since they are required for local + // calls and won't be serialized. We are also including DHT version. + res.addValueBytes( + ret ? val : null, + filterPassed, + ver, + mappedVer); } else { // We include values into response since they are required for local http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 761bbb0..b0f72dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -143,10 +143,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private AtomicBoolean mapped = new AtomicBoolean(false); /** Prepare reads. */ - private Iterable<IgniteTxEntry> reads; + private Collection<IgniteTxEntry> reads; /** Prepare writes. */ - private Iterable<IgniteTxEntry> writes; + private Collection<IgniteTxEntry> writes; /** Tx nodes. */ private Map<UUID, Collection<UUID>> txNodes; @@ -429,9 +429,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter catch (GridCacheEntryRemovedException e) { assert false : "Got entry removed exception while holding transactional lock on entry: " + e; } - catch (GridCacheFilterFailedException e) { - assert false : "Got filter failed exception with fail fast false " + e; - } } } @@ -472,7 +469,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (log.isDebugEnabled()) log.debug("Marking all local candidates as ready: " + this); - Iterable<IgniteTxEntry> checkEntries = writes; + Iterable<IgniteTxEntry> checkEntries = + (tx.optimistic() && tx.serializable()) ? F.concat(false, writes, reads) : writes; for (IgniteTxEntry txEntry : checkEntries) { GridCacheContext cacheCtx = txEntry.context(); http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index d8456d0..18c6d69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -508,14 +508,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda catch (GridCacheEntryRemovedException ignored) { // No-op, will retry. } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + e); - - colocated.context().evicts().touch(entry, topVer); - - break; - } } return remote; http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 14abc4b..d9840ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -996,10 +996,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { catch (GridCacheEntryRemovedException ignored) { // No-op, retry. } - catch (GridCacheFilterFailedException ignored) { - // No-op, skip the key. - break; - } catch (GridDhtInvalidPartitionException ignored) { success = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index eab68b1..241cc07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -413,10 +413,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte catch (GridCacheEntryRemovedException ignored) { // No-op, retry. } - catch (GridCacheFilterFailedException ignored) { - // No-op, skip the key. - break; - } catch (GridDhtInvalidPartitionException ignored) { success = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 7877df5..43cc92a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -599,12 +599,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap catch (GridCacheEntryRemovedException ignored) { entry = allowLocRead ? near.peekEx(key) : null; } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + e); - - break; - } finally { if (entry != null && !reload && tx == null) cctx.evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 0b43eb3..36eef52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -491,10 +491,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return; } - cctx.mvcc().recheckPendingLocks(); - tx.addEntryMapping(mappings.values()); + cctx.mvcc().recheckPendingLocks(); + tx.transactionNodes(txMapping.transactionNodes()); checkOnePhase(); http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 7c1e3d1..8446665 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -594,10 +594,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { catch (GridCacheEntryRemovedException ignored) { // No-op, retry. } - catch (GridCacheFilterFailedException ignored) { - // No-op, skip the key. - break; - } finally { if (entry != null) ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); @@ -1282,9 +1278,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { catch (GridCacheEntryRemovedException ignore) { assert false : "Entry cannot become obsolete while holding lock."; } - catch (GridCacheFilterFailedException ignore) { - assert false : "Filter should never fail with failFast=false and empty filter."; - } } // Store final batch. http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index cc2db03..5c57bb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1272,11 +1272,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter if (F.isEmpty(txEntry.entryProcessors())) return F.t(txEntry.op(), txEntry.value()); else { - try { - boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); + boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); - CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : - txEntry.cached().innerGet(this, + CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : + txEntry.cached().innerGet(this, /*swap*/false, /*read through*/false, /*fail fast*/true, @@ -1285,75 +1284,69 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /*event*/recordEvt, /*temporary*/true, /*subjId*/subjId, - /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, - resolveTaskName(), - null); - - boolean modified = false; + /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, + resolveTaskName(), + null); - Object val = null; + boolean modified = false; - Object key = null; + Object val = null; - GridCacheVersion ver; + Object key = null; - try { - ver = txEntry.cached().version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; + GridCacheVersion ver; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + try { + ver = txEntry.cached().version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; - ver = null; - } + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val, ver); + ver = null; + } - try { - EntryProcessor<Object, Object, Object> processor = t.get1(); + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), + txEntry.key(), key, cacheVal, val, ver); - processor.process(invokeEntry, t.get2()); + try { + EntryProcessor<Object, Object, Object> processor = t.get1(); - val = invokeEntry.getValue(); + processor.process(invokeEntry, t.get2()); - key = invokeEntry.key(); - } - catch (Exception ignore) { - // No-op. - } + val = invokeEntry.getValue(); - modified |= invokeEntry.modified(); + key = invokeEntry.key(); + } + catch (Exception ignore) { + // No-op. } - if (modified) - cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); + modified |= invokeEntry.modified(); + } - GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; + if (modified) + cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); - if (op == NOOP) { - ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); + GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; - if (expiry != null) { - long ttl = CU.toTtl(expiry.getExpiryForAccess()); + if (op == NOOP) { + ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); - txEntry.ttl(ttl); + if (expiry != null) { + long ttl = CU.toTtl(expiry.getExpiryForAccess()); - if (ttl == CU.TTL_ZERO) - op = DELETE; - } - } + txEntry.ttl(ttl); - return F.t(op, cacheVal); + if (ttl == CU.TTL_ZERO) + op = DELETE; + } } - catch (GridCacheFilterFailedException e) { - assert false : "Empty filter failed for innerGet: " + e; - return null; - } + return F.t(op, cacheVal); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 56a50f9..51a3316 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -468,7 +468,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter misses.put(key, entry.version()); } else - c.apply(key, res.get1(), res.get2()); + c.apply(key, skipVals ? true : res.get1(), res.get2()); break; } @@ -1413,13 +1413,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter break; } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + txEntry); - - if (!readCommitted()) - txEntry.readValue(e.<V>value()); - } catch (GridCacheEntryRemovedException ignored) { txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); } @@ -1527,34 +1520,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + entry); - - if (!readCommitted()) { - // Value for which failure occurred. - CacheObject val = e.value(); - - txEntry = addEntry(READ, - val, - null, - null, - entry, - expiryPlc, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore); - - // Mark as checked immediately for non-pessimistic. - if (val != null && !pessimistic()) - txEntry.markValid(); - } - - break; // While loop. - } finally { if (cacheCtx.isNear() && entry != null && readCommitted()) { if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) { @@ -1864,20 +1829,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.cached(entryEx(cacheCtx, txKey)); } - catch (GridCacheFilterFailedException e) { - // Failed value for the filter. - CacheObject val = e.value(); - - if (val != null) { - // If filter fails after lock is acquired, we don't reload, - // regardless if value is null or not. - missed.remove(cacheKey); - - txEntry.setAndMarkValid(val); - } - - break; // While. - } } } @@ -2206,11 +2157,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throw e; } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); - - assert false : "Empty filter failed: " + e; - } } else old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); @@ -2577,29 +2523,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (retval || invoke) { if (!cacheCtx.isNear()) { - try { - if (!hasPrevVal) { - boolean readThrough = - (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore(); - - v = cached.innerGet(this, - /*swap*/true, - readThrough, - /*failFast*/false, - /*unmarshal*/true, - /*metrics*/!invoke, - /*event*/!invoke && !dht(), - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - null); - } - } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); + if (!hasPrevVal) { + boolean readThrough = + (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore(); - assert false : "Empty filter failed: " + e; + v = cached.innerGet(this, + /*swap*/true, + readThrough, + /*failFast*/false, + /*unmarshal*/true, + /*metrics*/!invoke, + /*event*/!invoke && !dht(), + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + null); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index c86434c..2342a5d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -3079,7 +3079,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { assertNotNull(cache); - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); while (U.currentTimeMillis() < stopTime) { final Map<Integer, Integer> keys = new LinkedHashMap<>(); @@ -3091,10 +3091,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { if (restart) { doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() { @Override public Void call() throws Exception { - if (get) - cache.getAll(keys.keySet()); - - cache.putAll(keys); + if (get) { + for (Map.Entry<Integer, Integer> e : keys.entrySet()) { + if (rnd.nextBoolean()) { + cache.get(e.getKey()); + + if (rnd.nextBoolean()) + cache.put(e.getKey(), e.getValue()); + } + else + cache.put(e.getKey(), e.getValue()); + } + } + else + cache.putAll(keys); return null; } @@ -3102,10 +3112,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } else { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - if (get) - cache.getAll(keys.keySet()); - - cache.putAll(keys); + if (get) { + for (Map.Entry<Integer, Integer> e : keys.entrySet()) { + if (rnd.nextBoolean()) { + cache.get(e.getKey()); + + if (rnd.nextBoolean()) + cache.put(e.getKey(), e.getValue()); + } + else + cache.put(e.getKey(), e.getValue()); + } + } + else + cache.putAll(keys); tx.commit(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index ec3ea0c..a6b5535 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -3692,7 +3692,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract if (txShouldBeUsed()) { try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { // Remove missing key. - assertTrue(jcache().remove(UUID.randomUUID().toString())); + assertFalse(jcache().remove(UUID.randomUUID().toString())); tx.commit(); } @@ -3708,7 +3708,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract if (txShouldBeUsed()) { try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { // Remove missing key. - assertTrue(jcache().remove(UUID.randomUUID().toString())); + assertFalse(jcache().remove(UUID.randomUUID().toString())); tx.setRollbackOnly(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/183c1969/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java index 263c453..7fe0138 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -176,7 +177,7 @@ public class IgniteCacheCrossCacheTxFailoverTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ - public void _testCrossCacheOptimisticSerializableTxFailover() throws Exception { + public void testCrossCacheOptimisticSerializableTxFailover() throws Exception { crossCacheTxFailover(PARTITIONED, true, OPTIMISTIC, SERIALIZABLE, TestMemoryMode.HEAP); } @@ -432,6 +433,11 @@ public class IgniteCacheCrossCacheTxFailoverTest extends GridCommonAbstractTest @Override public int hashCode() { return (int)(key ^ (key >>> 32)); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } } /** @@ -454,6 +460,11 @@ public class IgniteCacheCrossCacheTxFailoverTest extends GridCommonAbstractTest public long value() { return val; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } } /**
