Merge branch ignite-1.5 into ignite-1282
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b6adb29 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b6adb29 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b6adb29 Branch: refs/heads/ignite-1.5-tx-futs-opts Commit: 9b6adb2903d69ac6e5cbf6e1677ee2ec9447fb2c Parents: 3d4ce80 Author: Alexey Goncharuk <[email protected]> Authored: Fri Nov 20 14:04:22 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Nov 20 14:04:22 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 4 +- .../GridDistributedTxPrepareRequest.java | 3 +- .../dht/GridPartitionedSingleGetFuture.java | 6 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 71 ++++++++++++-------- 5 files changed, 50 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9b6adb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 512a801..2b40351 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1986,7 +1986,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme (EntryProcessor<Object, Object, ?>)writeObj; CacheInvokeEntry<Object, Object> entry = - new CacheInvokeEntry<>(cctx, key, prevVal, version()); + new CacheInvokeEntry<>(cctx, key, prevVal, version(), keepPortable); try { entryProcessor.process(entry, invokeArgs); @@ -1994,7 +1994,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme evtVal = entry.modified() ? cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; } - catch (Exception e) { + catch (Exception ignore) { evtVal = prevVal; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b6adb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 95176ff..e595942 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -329,8 +329,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage } // Marshal txNodes only if there is a node in topology with an older version. - if (ctx.exchange().minimumNodeVersion(topologyVersion()) - .compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) { + if (ctx.exchange().minimumNodeVersion(topologyVersion()).compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) { if (txNodes != null && txNodesBytes == null) txNodesBytes = ctx.marshaller().marshal(txNodes); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b6adb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 8f2357b..32f4e80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -347,7 +347,8 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im subjId, null, taskName, - expiryPlc); + expiryPlc, + true); if (res != null) { v = res.get1(); @@ -366,7 +367,8 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im subjId, null, taskName, - expiryPlc); + expiryPlc, + true); } colocated.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b6adb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 72f5cf5..b69b42c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -239,7 +239,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } }); } - }); + }, opCtx); } AffinityTopologyVersion topVer = tx == null ? http://git-wip-us.apache.org/repos/asf/ignite/blob/9b6adb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fae7d8c..7c6a1d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -2047,7 +2046,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final CacheEntryPredicate[] filter, final GridCacheReturn ret, boolean skipStore, - final boolean singleRmv) { + final boolean singleRmv, + boolean keepBinary) { try { addActiveCache(cacheCtx); @@ -2076,7 +2076,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter singleRmv, hasFilters, needVal, - needReadVer); + needReadVer, + keepBinary); if (loadMissed) { return loadMissing(cacheCtx, @@ -2087,7 +2088,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter singleRmv, hasFilters, skipStore, - retval); + retval, + keepBinary); } return new GridFinishedFuture<>(); @@ -2146,8 +2148,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean rmv = lookup == null && invokeMap == null; - Set<KeyCacheObject> missedForLoad = null; - final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); final boolean needVal = singleRmv || retval || hasFilters; final boolean needReadVer = needVal && (serializable() && optimistic()); @@ -2157,6 +2157,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (invokeMap != null) transform = true; + Set<KeyCacheObject> missedForLoad = null; + for (Object key : keys) { if (key == null) { rollback(); @@ -2219,7 +2221,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter singleRmv, hasFilters, needVal, - needReadVer); + needReadVer, + keepBinary); if (loadMissed) { if (missedForLoad == null) @@ -2238,7 +2241,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter singleRmv, hasFilters, skipStore, - retval); + retval, + keepBinary); } return new GridFinishedFuture<>(); @@ -2269,7 +2273,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final boolean singleRmv, final boolean hasFilters, final boolean skipStore, - final boolean retval) { + final boolean retval, + final boolean keepBinary) { GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, @@ -2317,7 +2322,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else { boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); - ret.set(cacheCtx, cacheVal, success); + ret.set(cacheCtx, cacheVal, success, keepBinary); } } } @@ -2330,6 +2335,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter keys, /*skipVals*/singleRmv, needReadVer, + keepBinary, c); } @@ -2341,7 +2347,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param invokeArgs Optional arguments for EntryProcessor. * @param expiryPlc Explicitly specified expiry policy for entry. * @param retval Return value flag. - * @param lockOnly + * @param lockOnly Lock only flag. * @param filter Filter. * @param drVer DR version. * @param drTtl DR ttl. @@ -2358,10 +2364,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ private boolean enlistWriteEntry(GridCacheContext cacheCtx, final KeyCacheObject cacheKey, - final @Nullable Object val, - final @Nullable EntryProcessor<?, ?, ?> entryProcessor, - final @Nullable Object[] invokeArgs, - final @Nullable ExpiryPolicy expiryPlc, + @Nullable final Object val, + @Nullable final EntryProcessor<?, ?, ?> entryProcessor, + @Nullable final Object[] invokeArgs, + @Nullable final ExpiryPolicy expiryPlc, final boolean retval, final boolean lockOnly, final CacheEntryPredicate[] filter, @@ -2374,7 +2380,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean singleRmv, boolean hasFilters, final boolean needVal, - boolean needReadVer + boolean needReadVer, + boolean keepBinary ) throws IgniteCheckedException { boolean loadMissed = false; @@ -2418,7 +2425,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), entryProcessor, resolveTaskName(), - null) : null; + null, + keepBinary) : null; if (res != null) { old = res.get1(); @@ -2437,7 +2445,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), entryProcessor, resolveTaskName(), - null); + null, + keepBinary); } } catch (ClusterTopologyCheckedException e) { @@ -2450,7 +2459,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { - ret.set(cacheCtx, old, false); + ret.set(cacheCtx, old, false, keepBinary); if (!readCommitted()) { // Enlist failed filters as reads for non-read-committed mode, @@ -2466,7 +2475,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter -1L, -1L, null, - skipStore); + skipStore, + keepBinary); txEntry.markValid(); @@ -2497,7 +2507,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter drTtl, drExpireTime, drVer, - skipStore); + skipStore, + keepBinary); if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) cacheCtx.evicts().touch(entry, topologyVersion()); @@ -2516,7 +2527,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert txEntry.op() != TRANSFORM : txEntry; if (retval) - ret.set(cacheCtx, null, true); + ret.set(cacheCtx, null, true, keepBinary); else ret.success(true); } @@ -2529,7 +2540,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (retval && !transform) - ret.set(cacheCtx, old, true); + ret.set(cacheCtx, old, true, keepBinary); else { if (txEntry.op() == TRANSFORM) { GridCacheVersion ver; @@ -2557,7 +2568,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Pessimistic. else { if (retval && !transform) - ret.set(cacheCtx, old, true); + ret.set(cacheCtx, old, true, keepBinary); else ret.success(true); } @@ -2583,7 +2594,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!del) { if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { - ret.set(cacheCtx, v, false); + ret.set(cacheCtx, v, false, keepBinary); return loadMissed; } @@ -2602,7 +2613,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter drTtl, drExpireTime, drVer, - skipStore); + skipStore, + keepBinary); if (enlisted != null) enlisted.add(cacheKey); @@ -2630,7 +2642,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.markValid(); if (retval && !transform) - ret.set(cacheCtx, v, true); + ret.set(cacheCtx, v, true, keepBinary); else ret.success(true); } @@ -2904,7 +2916,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final GridCacheContext cacheCtx, K key, @Nullable V val, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor<K, V, Object> entryProcessor, @Nullable final Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate[] filter @@ -2932,7 +2944,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter filter, ret, opCtx != null && opCtx.skipStore(), - /*singleRmv*/false); + /*singleRmv*/false, + opCtx != null && opCtx.isKeepBinary()); if (pessimistic()) { assert loadFut == null || loadFut.isDone() : loadFut;
