ignite-1607 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6849ebe1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6849ebe1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6849ebe1 Branch: refs/heads/ignite-1607-read Commit: 6849ebe10779265cbd22f1afb35bb40c12529881 Parents: 54bbc75 Author: sboikov <[email protected]> Authored: Wed Oct 7 10:33:34 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 7 17:31:09 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMapEntry.java | 5 +- .../dht/CacheDistributedGetFutureAdapter.java | 203 +++ .../distributed/dht/GridDhtCacheEntry.java | 2 +- .../dht/GridPartitionedGetFuture.java | 106 +- .../dht/colocated/GridDhtColocatedCache.java | 1 + .../distributed/near/GridNearGetFuture.java | 101 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 42 +- .../cache/transactions/IgniteTxAdapter.java | 6 +- .../cache/transactions/IgniteTxEntry.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 457 ++++--- .../CacheSerializableTransactionsTest.java | 1230 +++++++++++++++--- .../processors/cache/GridCacheTestEntryEx.java | 4 +- 14 files changed, 1629 insertions(+), 538 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index bc36d2c..9106b05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -309,6 +309,7 @@ public interface GridCacheEntryEx { throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException; /** + * @param tx Cache transaction. * @param readSwap Flag indicating whether to check swap memory. * @param unmarshal Unmarshal flag. * @param updateMetrics If {@code true} then metrics should be updated. @@ -321,7 +322,9 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap, + @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned( + IgniteInternalTx tx, + boolean readSwap, boolean unmarshal, boolean updateMetrics, boolean evt, http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index b22f9b4..9378017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -691,6 +691,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + IgniteInternalTx tx, boolean readSwap, boolean unmarshal, boolean updateMetrics, @@ -700,7 +701,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc) throws IgniteCheckedException, GridCacheEntryRemovedException { - return (T2<CacheObject, GridCacheVersion>)innerGet0(null, + return (T2<CacheObject, GridCacheVersion>)innerGet0(tx, readSwap, false, evt, @@ -711,7 +712,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme transformClo, taskName, expiryPlc, - false); + true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java new file mode 100644 index 0000000..459362b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; +import org.apache.ignite.internal.util.lang.GridInClosure3; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteReducer; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; +import static org.apache.ignite.IgniteSystemProperties.getInteger; + +/** + * + */ +public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>> + implements GridCacheFuture<Map<K, V>> { + /** Default max remap count value. */ + public static final int DFLT_MAX_REMAP_CNT = 3; + + /** Maximum number of attempts to remap key to the same primary node. */ + protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); + + /** Context. */ + protected final GridCacheContext<K, V> cctx; + + /** Keys. */ + protected Collection<KeyCacheObject> keys; + + /** Reload flag. */ + protected boolean reload; + + /** Read through flag. */ + protected boolean readThrough; + + /** Force primary flag. */ + protected boolean forcePrimary; + + /** Future ID. */ + protected IgniteUuid futId; + + /** Trackable flag. */ + protected boolean trackable; + + /** Remap count. */ + protected AtomicInteger remapCnt = new AtomicInteger(); + + /** Subject ID. */ + protected UUID subjId; + + /** Task name. */ + protected String taskName; + + /** Whether to deserialize portable objects. */ + protected boolean deserializePortable; + + /** Skip values flag. */ + protected boolean skipVals; + + /** Expiry policy. */ + protected IgniteCacheExpiryPolicy expiryPlc; + + /** Flag indicating that get should be done on a locked topology version. */ + protected final boolean canRemap; + + /** */ + protected final boolean needVer; + + /** */ + protected final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC; + + /** + * @param cctx Context. + * @param keys Keys. + * @param readThrough Read through flag. + * @param reload Reload flag. + * @param forcePrimary If {@code true} then will force network trip to primary node even + * if called on backup node. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param resC Closure applied on 'get' result. + * @param needVer If {@code true} need provide entry version to result closure. + */ + protected CacheDistributedGetFutureAdapter( + GridCacheContext<K, V> cctx, + Collection<KeyCacheObject> keys, + boolean readThrough, + boolean reload, + boolean forcePrimary, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals, + boolean canRemap, + boolean needVer, + @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC + ) { + super(cctx.kernalContext(), + resC != null ? new ResultClosureReducer<K, V>(keys.size()) : CU.<K, V>mapsReducer(keys.size())); + + assert !F.isEmpty(keys); + assert !needVer || resC != null; + + this.cctx = cctx; + this.keys = keys; + this.readThrough = readThrough; + this.reload = reload; + this.forcePrimary = forcePrimary; + this.subjId = subjId; + this.taskName = taskName; + this.deserializePortable = deserializePortable; + this.expiryPlc = expiryPlc; + this.skipVals = skipVals; + this.canRemap = canRemap; + this.needVer = needVer; + this.resC = resC; + + futId = IgniteUuid.randomUuid(); + } + + /** + * @param key Key. + * @param val Value. + * @param ver Version. + */ + @SuppressWarnings("unchecked") + protected final void resultClosureValue(KeyCacheObject key, Object val, GridCacheVersion ver) { + assert resC != null; + + ResultClosureReducer<K, V> rdc = (ResultClosureReducer)reducer(); + + assert rdc != null; + + rdc.collect(key); + + resC.apply(key, val, ver); + } + + /** + * + */ + private static class ResultClosureReducer<K, V> implements IgniteReducer<Map<K, V>, Map<K, V>> { + /** */ + private final ConcurrentHashMap8<KeyCacheObject, Boolean> map; + + /** + * @param keys Number of keys. + */ + public ResultClosureReducer(int keys) { + this.map = new ConcurrentHashMap8<>(keys); + } + + /** + * @param key Key. + */ + void collect(KeyCacheObject key) { + map.put(key, Boolean.TRUE); + } + + /** {@inheritDoc} */ + @Override public boolean collect(@Nullable Map<K, V> map) { + return true; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> reduce() { + return (Map)map; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 90f6551..0004f02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -199,7 +199,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { if (serReadVer != null) { if (!serReadVer.equals(this.ver)) { - if (!(isNewLocked() && serReadVer.equals(IgniteTxEntry.SER_READ_NEW_ENTRY_VER))) + if (!((isNew() || deleted()) && serReadVer.equals(IgniteTxEntry.READ_NEW_ENTRY_VER))) return null; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 6b8c2ab..d8456d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -24,11 +24,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -40,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -49,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridInClosure3; @@ -67,83 +63,25 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; - /** * Colocated get future. */ -public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>> - implements GridCacheFuture<Map<K, V>> { +public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> { /** */ private static final long serialVersionUID = 0L; - /** Default max remap count value. */ - public static final int DFLT_MAX_REMAP_CNT = 3; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Logger. */ private static IgniteLogger log; - /** Maximum number of attempts to remap key to the same primary node. */ - private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(IGNITE_NEAR_GET_MAX_REMAPS, - DFLT_MAX_REMAP_CNT); - - /** Context. */ - private final GridCacheContext<K, V> cctx; - - /** Keys. */ - private Collection<KeyCacheObject> keys; - /** Topology version. */ private AffinityTopologyVersion topVer; - /** Reload flag. */ - private boolean reload; - - /** Read-through flag. */ - private boolean readThrough; - - /** Force primary flag. */ - private boolean forcePrimary; - - /** Future ID. */ - private IgniteUuid futId; - /** Version. */ private GridCacheVersion ver; - /** Trackable flag. */ - private volatile boolean trackable; - - /** Remap count. */ - private AtomicInteger remapCnt = new AtomicInteger(); - - /** Subject ID. */ - private UUID subjId; - - /** Task name. */ - private String taskName; - - /** Whether to deserialize portable objects. */ - private boolean deserializePortable; - - /** Expiry policy. */ - private IgniteCacheExpiryPolicy expiryPlc; - - /** Skip values flag. */ - private boolean skipVals; - - /** Flag indicating whether future can be remapped on a newer topology version. */ - private final boolean canRemap; - - /** */ - private final boolean needVer; - - /** */ - private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC; - /** * @param cctx Context. * @param keys Keys. @@ -158,6 +96,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} need provide entry version to result closure. * @param resC Closure applied on 'get' result. */ public GridPartitionedGetFuture( @@ -176,27 +115,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M boolean needVer, @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC ) { - super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); + super(cctx, + keys, + readThrough, + reload, + forcePrimary, + subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + needVer, + resC); - assert !F.isEmpty(keys); - assert !needVer || resC != null; - - this.cctx = cctx; - this.keys = keys; this.topVer = topVer; - this.readThrough = readThrough; - this.reload = reload; - this.forcePrimary = forcePrimary; - this.subjId = subjId; - this.deserializePortable = deserializePortable; - this.taskName = taskName; - this.expiryPlc = expiryPlc; - this.skipVals = skipVals; - this.canRemap = canRemap; - this.needVer = needVer; - this.resC = resC; - - futId = IgniteUuid.randomUuid(); ver = cctx.versions().next(); @@ -331,7 +264,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M final int keysSize = keys.size(); - Map<K, V> locVals = U.newHashMap(keysSize); + Map<K, V> locVals = resC == null ? U.<K, V>newHashMap(keysSize) : null; boolean hasRmtNodes = false; @@ -342,7 +275,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (isDone()) return; - if (!locVals.isEmpty()) + if (!F.isEmpty(locVals)) add(new GridFinishedFuture<>(locVals)); if (hasRmtNodes) { @@ -483,6 +416,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -521,7 +455,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } else { if (resC != null) - resC.apply(key, skipVals ? true : v, ver); + resultClosureValue(key, skipVals ? true : v, ver); else cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); @@ -628,7 +562,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M for (GridCacheEntryInfo info : infos) { assert skipVals == (info.value() == null); - resC.apply(info.key(), skipVals ? true : info.value(), info.version()); + resultClosureValue(info.key(), skipVals ? true : info.value(), info.version()); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index de82068..6b6352f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -353,6 +353,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 3b70325..9ed63ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -24,7 +24,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -39,18 +38,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridInClosure3; @@ -68,83 +66,26 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; -import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; /** * */ -public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>> - implements GridCacheFuture<Map<K, V>> { +public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> { /** */ private static final long serialVersionUID = 0L; - /** Default max remap count value. */ - public static final int DFLT_MAX_REMAP_CNT = 3; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Logger. */ private static IgniteLogger log; - /** Maximum number of attempts to remap key to the same primary node. */ - private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); - - /** Context. */ - private final GridCacheContext<K, V> cctx; - - /** Keys. */ - private Collection<KeyCacheObject> keys; - - /** Reload flag. */ - private boolean reload; - - /** Read through flag. */ - private boolean readThrough; - - /** Force primary flag. */ - private boolean forcePrimary; - - /** Future ID. */ - private IgniteUuid futId; - - /** Version. */ - private GridCacheVersion ver; - /** Transaction. */ private IgniteTxLocalEx tx; - /** Trackable flag. */ - private boolean trackable; - - /** Remap count. */ - private AtomicInteger remapCnt = new AtomicInteger(); - - /** Subject ID. */ - private UUID subjId; - - /** Task name. */ - private String taskName; - - /** Whether to deserialize portable objects. */ - private boolean deserializePortable; - - /** Skip values flag. */ - private boolean skipVals; - - /** Expiry policy. */ - private IgniteCacheExpiryPolicy expiryPlc; - - /** Flag indicating that get should be done on a locked topology version. */ - private final boolean canRemap; - /** */ - private final boolean needVer; - - /** */ - private final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC; + private GridCacheVersion ver; /** * @param cctx Context. @@ -159,6 +100,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} need provide entry version to result closure. + * @param resC Closure applied on 'get' result. */ public GridNearGetFuture( GridCacheContext<K, V> cctx, @@ -176,25 +120,24 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma boolean needVer, @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC ) { - super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); + super(cctx, + keys, + readThrough, + reload, + forcePrimary, + subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + needVer, + resC); assert !F.isEmpty(keys); assert !needVer || resC != null; - this.cctx = cctx; - this.keys = keys; - this.readThrough = readThrough; - this.reload = reload; - this.forcePrimary = forcePrimary; this.tx = tx; - this.subjId = subjId; - this.taskName = taskName; - this.deserializePortable = deserializePortable; - this.expiryPlc = expiryPlc; - this.skipVals = skipVals; - this.canRemap = canRemap; - this.needVer = needVer; - this.resC = resC; futId = IgniteUuid.randomUuid(); @@ -474,6 +417,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (isNear) { if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/true, @@ -520,6 +464,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -595,7 +540,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else - resC.apply(key, v, ver); + resultClosureValue(key, v, ver); } else { if (affNode == null) { @@ -761,7 +706,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma assert skipVals == (info.value() == null); if (resC != null) - resC.apply(key, skipVals ? true : val, info.version()); + resultClosureValue(key, skipVals ? true : val, info.version()); else cctx.addResult(map, key, val, skipVals, false, deserializePortable, false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 4a7efb4..e48601d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -570,7 +570,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre // Must lock near entries separately. if (m.near()) { try { - tx.optimisticLockEntries(req.writes()); + tx.optimisticLockEntries(m.entries()); tx.userPrepare(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 2c2915f..c43cab5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -362,7 +363,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { needVer, c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { - return true; + try { + Map<Object, Object> map = f.get(); + + if (map != null && map.size() != keys.size()) { + for (KeyCacheObject key : keys) { + if (!map.containsKey(key)) + c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); + } + } + + return true; + } + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } } }); } @@ -383,7 +400,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { c ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { - return true; + try { + Map<Object, Object> map = f.get(); + + if (map != null && map.size() != keys.size()) { + for (KeyCacheObject key : keys) { + if (!map.containsKey(key)) + c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); + } + } + + return true; + } + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } } }); } else { @@ -868,7 +901,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal( @Nullable Collection<IgniteTxEntry> reads, @Nullable Collection<IgniteTxEntry> writes, - Map<UUID, Collection<UUID>> txNodes, boolean last, + Map<UUID, Collection<UUID>> txNodes, + boolean last, Collection<UUID> lastBackups ) { if (state() != PREPARING) { @@ -896,7 +930,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { try { // At this point all the entries passed in must be enlisted in transaction because this is an // optimistic transaction. - optimisticLockEntries = writes; + optimisticLockEntries = optimistic() && serializable() ? F.concat(false, writes, reads) : writes; userPrepare(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0286efe..c896f6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -325,7 +325,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter threadId = Thread.currentThread().getId(); - log = U.logger(cctx.kernalContext(), logRef, this); + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, this); } /** @@ -374,7 +375,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implicitSingle = false; loc = false; - log = U.logger(cctx.kernalContext(), logRef, this); + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, this); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index f1cd2d4..7929167 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -67,7 +67,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private static final long serialVersionUID = 0L; /** Dummy version for non-existing entry read in SERIALIZABLE transaction. */ - public static final GridCacheVersion SER_READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0); + public static final GridCacheVersion READ_NEW_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0); /** Owning transaction. */ @GridToStringExclude @@ -322,6 +322,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { cp.conflictVer = conflictVer; cp.expiryPlc = expiryPlc; cp.flags = flags; + cp.serReadVer = serReadVer; return cp; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 2b745ac..76df164 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; @@ -1268,11 +1269,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter AffinityTopologyVersion topVer = topologyVersion(); + boolean needReadVer = optimistic() && serializable(); + // In this loop we cover only read-committed or optimistic transactions. // Transactions that are pessimistic and not read-committed are covered // outside of this loop. for (KeyCacheObject key : keys) { - if (pessimistic() && !readCommitted() && !skipVals) + if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals) addActiveCache(cacheCtx); IgniteTxKey txKey = cacheCtx.txKey(key); @@ -1370,24 +1373,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheVersion ver = entry.version(); CacheObject val = null; + GridCacheVersion readVer = null; if (!pessimistic() || readCommitted() && !skipVals) { IgniteCacheExpiryPolicy accessPlc = optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; - // This call will check for filter. - val = entry.innerGet(this, - /*swap*/true, - /*no read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/true, - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc); + if (needReadVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this, + /*swap*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc); + + if (res != null) { + val = res.get1(); + readVer = res.get2(); + } + } + else { + val = entry.innerGet(this, + /*swap*/true, + /*no read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc); + } if (val != null) { cacheCtx.addResult(map, @@ -1421,8 +1442,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // As optimization, mark as checked immediately // for non-pessimistic if value is not null. - if (val != null && !pessimistic()) + if (val != null && !pessimistic()) { txEntry.markValid(); + + if (needReadVer) { + assert readVer != null; + + txEntry.serializableReadVersion(readVer); + } + } } break; // While. @@ -1532,9 +1560,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Loading missed values for missed map: " + missedMap); - final Collection<KeyCacheObject> loaded = U.newHashSet(missedMap.size()); + final Collection<KeyCacheObject> loaded = + readCommitted() ? U.<KeyCacheObject>newHashSet(missedMap.size()) : null; - final boolean needVer = optimistic() && serializable(); + final boolean needReadVer = optimistic() && serializable(); return new GridEmbeddedFuture<>( new C2<Boolean, Exception, Map<K, V>>() { @@ -1555,27 +1584,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } - // In read-committed mode touch entries that have just been read. - boolean touch = readCommitted(); + if (readCommitted()) { + assert loaded != null; - for (KeyCacheObject key : missedMap.keySet()) { - if (loaded.contains(key)) - continue; + Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet()); - GridCacheVersion ver = needVer ? IgniteTxEntry.SER_READ_NEW_ENTRY_VER : null; + notFound.removeAll(loaded); - onLoaded(key, - null, - ver, - cacheCtx, - map, - missedMap, - deserializePortable, - skipVals, - keepCacheObjects, - loaded); + // In read-committed mode touch entries that have just been read. + for (KeyCacheObject key : notFound) { + if (loaded.contains(key)) + continue; - if (touch) { IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : @@ -1596,174 +1616,141 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedMap.keySet(), deserializePortable, skipVals, - needVer, + needReadVer, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { - @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { - onLoaded(key, - val, - loadVer, - cacheCtx, - map, - missedMap, - deserializePortable, - skipVals, - keepCacheObjects, - loaded); - } - }) - ); - } + /** */ + private GridCacheVersion nextVer; - /** - * @param key Key. - * @param val Value. - * @param loadVer Entry version. - * @param cacheCtx Cache context. - * @param map Return map. - * @param missedMap Missed keys. - * @param deserializePortable Deserialize portable flag. - * @param skipVals Skip values flag. - * @param keepCacheObjects Keep cache objects flag. - * @param loaded Loaded values map. - */ - private <K, V> void onLoaded( - KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer, - GridCacheContext cacheCtx, - Map<K, V> map, - Map<KeyCacheObject, GridCacheVersion> missedMap, - final boolean deserializePortable, - boolean skipVals, - boolean keepCacheObjects, - Collection<KeyCacheObject> loaded) { - if (isRollbackOnly()) { - if (log.isDebugEnabled()) - log.debug("Ignoring loaded value for read because transaction was rolled back: " + - IgniteTxLocalAdapter.this); + @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { + if (isRollbackOnly()) { + if (log.isDebugEnabled()) + log.debug("Ignoring loaded value for read because transaction was rolled back: " + + IgniteTxLocalAdapter.this); - return; - } + return; + } - GridCacheVersion ver = missedMap.get(key); + GridCacheVersion ver = missedMap.get(key); - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); + if (ver == null) { + if (log.isDebugEnabled()) + log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - return; - } + return; + } - CacheObject cacheVal = cacheCtx.toCacheObject(val); + CacheObject cacheVal = cacheCtx.toCacheObject(val); - CacheObject visibleVal = cacheVal; + CacheObject visibleVal = cacheVal; - IgniteTxKey txKey = cacheCtx.txKey(key); + IgniteTxKey txKey = cacheCtx.txKey(key); - IgniteTxEntry txEntry = entry(txKey); + IgniteTxEntry txEntry = entry(txKey); - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(cacheVal); + if (txEntry != null) { + if (!readCommitted()) + txEntry.readValue(cacheVal); - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); - } + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); + } - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; + // In pessimistic mode we hold the lock, so filter validation + // should always be valid. + if (pessimistic()) + ver = null; - // Initialize next version. - GridCacheVersion nextVer = cctx.versions().next(topologyVersion()); + // Initialize next version. + if (nextVer == null) + nextVer = cctx.versions().next(topologyVersion()); - while (true) { - assert txEntry != null || readCommitted() || skipVals; + while (true) { + assert txEntry != null || readCommitted() || skipVals; - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - try { - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set; + try { + // Must initialize to true since even if filter didn't pass, + // we still record the transaction value. + boolean set; - try { - set = e.versionedValue(cacheVal, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); + try { + set = e.versionedValue(cacheVal, ver, nextVer); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAll method " + + "(will try again): " + e); - if (pessimistic() && !readCommitted() && !isRollbackOnly()) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); + if (pessimistic() && !readCommitted() && !isRollbackOnly()) { + U.error(log, "Inconsistent transaction state (entry got removed while " + + "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); - setRollbackOnly(); + setRollbackOnly(); - return; - } + return; + } - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey)); + if (txEntry != null) + txEntry.cached(entryEx(cacheCtx, txKey)); - continue; // While loop. - } + continue; // While loop. + } - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); + // In pessimistic mode, we should always be able to set. + assert set || !pessimistic(); - if (readCommitted() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); + if (readCommitted() || skipVals) { + cacheCtx.evicts().touch(e, topologyVersion()); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } - else { - assert txEntry != null; + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); + } + } + else { + assert txEntry != null; - txEntry.setAndMarkValid(cacheVal); + txEntry.setAndMarkValid(cacheVal); - if (optimistic() && serializable()) { - assert loadVer != null; + if (needReadVer) { + assert loadVer != null; - txEntry.serializableReadVersion(loadVer); - } + txEntry.serializableReadVersion(loadVer); + } - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); + } + } - if (val != null) - loaded.add(key); + if (readCommitted()) + loaded.add(key); - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry from transaction [set=" + set + + ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); - break; // While loop. - } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); - } - } + break; // While loop. + } + catch (IgniteCheckedException ex) { + throw new IgniteException("Failed to put value for cache entry: " + e, ex); + } + } + } + }) + ); } /** {@inheritDoc} */ @@ -2066,14 +2053,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** * Checks filter for non-pessimistic transactions. * - * @param cached Cached entry. + * @param cctx Cache context. + * @param key Key. + * @param val Value. * @param filter Filter to check. * @return {@code True} if passed or pessimistic. - * @throws IgniteCheckedException If failed. */ - private <K, V> boolean filter(GridCacheEntryEx cached, - CacheEntryPredicate[] filter) throws IgniteCheckedException { - return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter); + private boolean filter( + GridCacheContext cctx, + KeyCacheObject key, + CacheObject val, + CacheEntryPredicate[] filter) { + return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter); } /** @@ -2097,7 +2088,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param skipStore Skip store flag. * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). */ - protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite( + private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite( final GridCacheContext cacheCtx, Collection<?> keys, @Nullable GridCacheEntryEx cached, @@ -2108,7 +2099,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Nullable Object[] invokeArgs, boolean retval, boolean lockOnly, - CacheEntryPredicate[] filter, + final CacheEntryPredicate[] filter, final GridCacheReturn ret, Collection<KeyCacheObject> enlisted, @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap, @@ -2117,6 +2108,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ) { assert cached == null || keys.size() == 1; assert cached == null || F.first(keys).equals(cached.key()); + assert retval || invokeMap == null; try { addActiveCache(cacheCtx); @@ -2131,6 +2123,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter Set<KeyCacheObject> missedForLoad = null; + final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + final boolean needReadVer = (retval || hasFilters) && (optimistic() && serializable()); + try { // Set transform flag for transaction. if (invokeMap != null) @@ -2210,24 +2205,40 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ", locNodeId=" + cctx.localNodeId() + ']'); CacheObject old = null; - - boolean readThrough = !skipStore && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + GridCacheVersion readVer = null; if (optimistic() && !implicit()) { try { - // Should read through if filter is specified. - old = entry.innerGet(this, - /*swap*/false, - /*read-through*/readThrough && cacheCtx.loadPreviousValue(), - /*fail-fast*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null); + if (needReadVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this, + /*swap*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null); + + if (res != null) { + old = res.get1(); + readVer = res.get2(); + } + } + else { + old = entry.innerGet(this, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null); + } } catch (ClusterTopologyCheckedException e) { entry.context().evicts().touch(entry, topologyVersion()); @@ -2243,12 +2254,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - if (!filter(entry, filter)) { + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { skipped = skip(skipped, cacheKey); ret.set(cacheCtx, old, false); - if (!readCommitted() && old != null) { + if (!readCommitted()) { // Enlist failed filters as reads for non-read-committed mode, // so future ops will get the same values. txEntry = addEntry(READ, @@ -2265,9 +2276,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter skipStore); txEntry.markValid(); + + if (needReadVer) { + assert readVer != null; + + txEntry.serializableReadVersion(readVer); + } } - if (readCommitted() || old == null) + if (readCommitted()) cacheCtx.evicts().touch(entry, topologyVersion()); break; // While. @@ -2298,7 +2315,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.markValid(); if (old == null) { - boolean load = retval && !readThrough; + boolean load = retval || hasFilters; if (load) { if (missedForLoad == null) @@ -2317,6 +2334,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { + if (needReadVer) { + assert readVer != null; + + txEntry.serializableReadVersion(readVer); + } + if (retval && !transform) ret.set(cacheCtx, old, true); else { @@ -2362,7 +2385,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else { if (entryProcessor == null && txEntry.op() == TRANSFORM) throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after transform closure is applied): " + key); + "transaction after EntryProcessor is applied): " + key); GridCacheEntryEx entry = txEntry.cached(); @@ -2371,7 +2394,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean del = txEntry.op() == DELETE && rmv; if (!del) { - if (!filter(entry, filter)) { + if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { skipped = skip(skipped, cacheKey); ret.set(cacheCtx, v, false); @@ -2439,7 +2462,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedForLoad, deserializePortables(cacheCtx), /*skip values*/false, - false, + needReadVer, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, @Nullable Object val, @@ -2451,6 +2474,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert e != null; + if (needReadVer) { + assert loadVer != null; + + e.serializableReadVersion(loadVer); + } + CacheObject cacheVal = cacheCtx.toCacheObject(val); if (e.op() == TRANSFORM) { @@ -2470,8 +2499,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter addInvokeResult(e, cacheVal, ret, ver); } - else - ret.set(cacheCtx, cacheVal, true); + else { + boolean success = hasFilters ? isAll(e.context(), key, cacheVal, filter) : true; + + ret.set(cacheCtx, cacheVal, success); + } } }); @@ -2491,6 +2523,31 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cctx Cache context. + * @param key Key. + * @param val Value. + * @param filter Filter. + * @return {@code True} if filter passed. + */ + private boolean isAll(GridCacheContext cctx, + KeyCacheObject key, + CacheObject val, + CacheEntryPredicate[] filter) { + GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) { + @Nullable @Override public CacheObject peekVisibleValue() { + return rawGet(); + } + }; + + for (CacheEntryPredicate p0 : filter) { + if (p0 != null && !p0.apply(e)) + return false; + } + + return true; + } + + /** * Post lock processing for put or remove. * * @param cacheCtx Context.
