http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index bbd2db0,14ce1f9..79358e8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@@ -211,47 -208,34 +216,55 @@@ public interface GridDhtPartitionTopolo * @param exchId Exchange ID. * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @return {@code True} if topology state changed. */ - public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, Long> cntrMap); + @Nullable Map<Integer, T2<Long, Long>> cntrMap); /** * @param exchId Exchange ID. * @param parts Partitions. * @param cntrMap Partition update counters. - * @return Local partition map if there were evictions or {@code null} otherwise. + * @param checkEvictions Check evictions flag. + * @return {@code True} if topology state changed. */ - @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - @Nullable Map<Integer, T2<Long, Long>> cntrMap); - @Nullable Map<Integer, Long> cntrMap, ++ @Nullable Map<Integer, T2<Long, Long>> cntrMap, + boolean checkEvictions); /** + * Checks if there is at least one owner for each partition in the cache topology. + * If not, marks such a partition as LOST. + * <p> + * This method should be called on topology coordinator after all partition messages are received. + * + * @param discoEvt Discovery event for which we detect lost partitions. + * @return {@code True} if partitons state got updated. + */ + public boolean detectLostPartitions(DiscoveryEvent discoEvt); + + /** + * Resets the state of all LOST partitions to OWNING. + */ + public void resetLostPartitions(); + + /** + * @return Collection of lost partitions, if any. + */ + public Collection<Integer> lostPartitions(); + + /** + * + */ + public void checkEvictions(); + + /** + * @param skipZeros If {@code true} then filters out zero counters. * @return Partition update counters. */ - public Map<Integer, T2<Long, Long>> updateCounters(); + public Map<Integer, Long> updateCounters(boolean skipZeros); /** * @param part Partition to own.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 05c5c2d,1b4dcc9..5bda0bf --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -30,18 -30,14 +30,19 @@@ import java.util.Set import java.util.UUID; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; + import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; +import org.apache.ignite.internal.processors.cache.CacheState; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@@ -593,12 -539,8 +591,10 @@@ import static org.apache.ignite.interna /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { + treatAllPartitionAsLocal = false; + boolean changed = waitForRent(); - ClusterNode loc = cctx.localNode(); - int num = cctx.affinity().partitions(); AffinityTopologyVersion topVer = exchFut.topologyVersion(); @@@ -676,10 -618,10 +672,10 @@@ if (locPart != null) { GridDhtPartitionState state = locPart.state(); - if (state == MOVING) { + if (state == MOVING && cctx.shared().cache().globalState() == CacheState.ACTIVE) { locPart.rent(false); - updateLocal(p, loc.id(), locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); changed = true; @@@ -1067,9 -979,9 +1066,9 @@@ /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, Long> cntrMap) { + @Nullable Map<Integer, T2<Long, Long>> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@@ -1079,13 -991,13 +1078,13 @@@ try { if (stopping) - return null; + return false; if (cntrMap != null) { - for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); + for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { + T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr < e.getValue()) + if (cntr == null || cntr.get2() < e.getValue().get2()) this.cntrMap.put(e.getKey(), e.getValue()); } @@@ -1240,10 -1104,7 +1239,10 @@@ if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); + if (changed) + cctx.shared().exchange().scheduleResendPartitions(); + - return changed ? localPartitionMap() : null; + return changed; } finally { lock.writeLock().unlock(); @@@ -1251,10 -1112,10 +1250,10 @@@ } /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - @Nullable Map<Integer, T2<Long, Long>> cntrMap) { - @Nullable Map<Integer, Long> cntrMap, ++ @Nullable Map<Integer, T2<Long, Long>> cntrMap, + boolean checkEvictions) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@@ -1270,26 -1131,21 +1269,26 @@@ try { if (stopping) - return null; + return false; if (cntrMap != null) { - for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Integer p = e.getKey(); - - Long cntr = this.cntrMap.get(p); + for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { + T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr < e.getValue()) - this.cntrMap.put(p, e.getValue()); + if (cntr == null || cntr.get2() < e.getValue().get2()) + this.cntrMap.put(e.getKey(), e.getValue()); + } - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); ++ for (int p = 0; p < locParts.length(); p++) { + GridDhtLocalPartition part = locParts.get(p); - if (part != null) - part.updateCounter(e.getValue()); + if (part == null) + continue; + + T2<Long, Long> cntr = cntrMap.get(part.id()); + + if (cntr != null && cntr.get2() > part.updateCounter()) + part.updateCounter(cntr.get2()); } } @@@ -1320,58 -1176,86 +1319,97 @@@ long updateSeq = this.updateSeq.incrementAndGet(); - node2part = new GridDhtPartitionFullMap(node2part, updateSeq); + node2part.newUpdateSequence(updateSeq); - boolean changed = false; + boolean changed = cur == null || !cur.equals(parts); - if (cur == null || !cur.equals(parts)) - changed = true; + if (changed) { + node2part.put(parts.nodeId(), parts); - node2part.put(parts.nodeId(), parts); + // Add new mappings. + for (Integer p : parts.keySet()) { + Set<UUID> ids = part2node.get(p); - part2node = new HashMap<>(part2node); + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - // Add new mappings. - for (Integer p : parts.keySet()) { - Set<UUID> ids = part2node.get(p); + ids.add(parts.nodeId()); + } - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : cur.keySet()) { + if (parts.containsKey(p)) + continue; - changed |= ids.add(parts.nodeId()); - } + Set<UUID> ids = part2node.get(p); - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = part2node.get(p); - - if (ids != null) - changed |= ids.remove(parts.nodeId()); + if (ids != null) + ids.remove(parts.nodeId()); + } } } + else + cur.updateSequence(parts.updateSequence(), parts.topologyVersion()); - if (checkEvictions) - changed |= checkEvictions(updateSeq); + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { + List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + - changed |= checkEvictions(updateSeq, aff); ++ if (checkEvictions) ++ changed |= checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } consistencyCheck(); if (log.isDebugEnabled()) log.debug("Partition map after single update: " + fullMapString()); + if (changed) + cctx.shared().exchange().scheduleResendPartitions(); + - return changed ? localPartitionMap() : null; + return changed; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param updateSeq Update sequence. + * @return {@code True} if state changed. + */ + private boolean checkEvictions(long updateSeq) { + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + + boolean changed = false; + + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { + List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + + changed = checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } + + return changed; + } + + /** {@inheritDoc} */ + @Override public void checkEvictions() { + lock.writeLock().lock(); + + try { + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part.newUpdateSequence(updateSeq); + + checkEvictions(updateSeq); } finally { lock.writeLock().unlock(); @@@ -1698,24 -1387,32 +1748,34 @@@ } } - UUID locNodeId = cctx.localNodeId(); + if (node2part != null) { - GridDhtPartitionMap2 map = node2part.get(nodeId); ++ UUID locNodeId = cctx.localNodeId(); + - GridDhtPartitionMap2 map = node2part.get(locNodeId); ++ GridDhtPartitionMap2 map = node2part.get(locNodeId); - if (map == null) - node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer, - Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); - if (map == null) { - map = new GridDhtPartitionMap2(locNodeId, - updateSeq, - topVer, - Collections.<Integer, GridDhtPartitionState>emptyMap(), - false); ++ if (map == null) { ++ map = new GridDhtPartitionMap2(locNodeId, ++ updateSeq, ++ topVer, ++ Collections.<Integer, GridDhtPartitionState>emptyMap(), ++ false); + - node2part.put(locNodeId, map); - } ++ node2part.put(locNodeId, map); ++ } - map.updateSequence(updateSeq, topVer); + map.updateSequence(updateSeq, topVer); - map.put(p, state); + map.put(p, state); - Set<UUID> ids = part2node.get(p); + Set<UUID> ids = part2node.get(p); - if (ids == null) - part2node.put(p, ids = U.newHashSet(3)); + if (ids == null) + part2node.put(p, ids = U.newHashSet(3)); - ids.add(nodeId); - ids.add(locNodeId); ++ ids.add(locNodeId); + } + + return updateSeq; } /** @@@ -1807,23 -1500,26 +1863,24 @@@ } /** {@inheritDoc} */ - @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) { - @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { ++ @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { - return node2part.get(nodeId); - } - finally { - lock.readLock().unlock(); - } - } - Map<Integer, Long> res; ++ Map<Integer, T2<Long, Long>> res; - /** {@inheritDoc} */ - @Override public Map<Integer, T2<Long, Long>> updateCounters() { - lock.readLock().lock(); + if (skipZeros) { + res = U.newHashMap(cntrMap.size()); - try { - Map<Integer, T2<Long, Long>> res = new HashMap<>(cntrMap); - for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = e.getValue(); - - if (ZERO.equals(cntr)) ++ for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { ++ if (ZERO.equals(e.getValue().get1()) && ZERO.equals(e.getValue().get2())) + continue; + - res.put(e.getKey(), cntr); ++ res.put(e.getKey(), e.getValue()); + } + } + else + res = new HashMap<>(cntrMap); for (int i = 0; i < locParts.length(); i++) { GridDhtLocalPartition part = locParts.get(i); @@@ -1831,11 -1527,14 +1888,14 @@@ if (part == null) continue; - Long cntr0 = res.get(part.id()); - long cntr1 = part.updateCounter(); + T2<Long, Long> cntr0 = res.get(part.id()); + Long cntr1 = part.initialUpdateCounter(); + if (skipZeros && cntr1 == 0L) + continue; + - if (cntr0 == null || cntr1 > cntr0) - res.put(part.id(), cntr1); + if (cntr0 == null || cntr1 > cntr0.get1()) + res.put(part.id(), new T2<Long, Long>(cntr1, part.updateCounter())); } return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 8096950,8aeecf8..b04dc30 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -365,8 -361,12 +365,13 @@@ public final class GridDhtTxPrepareFutu boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); - if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM || - tx.nearOnOriginatingNode() || tx.hasInterceptor()) { + CacheObject val; + CacheObject oldVal = null; + - boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM; ++ boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM || ++ tx.nearOnOriginatingNode() || tx.hasInterceptor(); + + if (readOld) { cached.unswap(retVal); boolean readThrough = !txEntry.skipStore() && @@@ -381,9 -381,10 +386,9 @@@ final boolean keepBinary = txEntry.keepBinary(); - CacheObject val = cached.innerGet( + val = oldVal = cached.innerGet( null, tx, - /*swap*/true, readThrough, /*metrics*/retVal, /*event*/evt, http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index f750840,b291bd2..be32767 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -2779,12 -2809,10 +2819,12 @@@ public class GridDhtAtomicCache<K, V> e * locks are released. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion topVer) + private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { + ctx.shared().database().checkpointReadLock(); + - if (keys.size() == 1) { - KeyCacheObject key = keys.get(0); + if (req.size() == 1) { + KeyCacheObject key = req.key(0); while (true) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 058d4aa,efb35c4..98ab764 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@@ -23,18 -23,9 +23,14 @@@ import java.util.HashMap import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; - import org.apache.ignite.internal.IgniteInternalFuture; - import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; - import org.apache.ignite.internal.processors.cache.CacheObject; - import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@@ -48,12 -35,9 +40,12 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; - import org.apache.ignite.lang.IgniteUuid; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_MAX_CONCURRENT_DHT_UPDATES; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + /** * DHT atomic cache backup update future. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 0000000,87d9225..84c2109 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@@ -1,0 -1,1025 +1,1046 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + + import java.io.Externalizable; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.List; + import java.util.UUID; + import javax.cache.expiry.ExpiryPolicy; + import javax.cache.processor.EntryProcessor; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.IgniteLogger; + import org.apache.ignite.cache.CacheWriteSynchronizationMode; + import org.apache.ignite.internal.GridDirectCollection; + import org.apache.ignite.internal.GridDirectTransient; + import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; + import org.apache.ignite.internal.processors.cache.CacheObject; + import org.apache.ignite.internal.processors.cache.GridCacheContext; + import org.apache.ignite.internal.processors.cache.GridCacheOperation; + import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; + import org.apache.ignite.internal.processors.cache.KeyCacheObject; + import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; + import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + import org.apache.ignite.internal.util.GridLongList; + import org.apache.ignite.internal.util.tostring.GridToStringInclude; + import org.apache.ignite.internal.util.typedef.F; + import org.apache.ignite.internal.util.typedef.internal.CU; + import org.apache.ignite.internal.util.typedef.internal.S; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; + import org.apache.ignite.plugin.extensions.communication.MessageReader; + import org.apache.ignite.plugin.extensions.communication.MessageWriter; + import org.jetbrains.annotations.NotNull; + import org.jetbrains.annotations.Nullable; + + import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; + import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; + import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; + + /** + * Lite DHT cache update request sent from near node to primary node. + */ + public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdateRequest { + /** */ + private static final long serialVersionUID = 0L; + + /** Target node ID. */ + @GridDirectTransient + private UUID nodeId; + + /** Future version. */ + private GridCacheVersion futVer; + + /** Update version. Set to non-null if fastMap is {@code true}. */ + private GridCacheVersion updateVer; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Write synchronization mode. */ + private CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + private GridCacheOperation op; + + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + + /** Fast map flag. */ + protected boolean fastMap; + + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + protected boolean topLocked; + + /** Flag indicating whether request contains primary keys. */ + protected boolean hasPrimary; + + /** Skip write-through to a persistent storage. */ + protected boolean skipStore; + + /** */ + protected boolean clientReq; + + /** Keep binary flag. */ + protected boolean keepBinary; + + /** Return value flag. */ + protected boolean retval; + ++ /** */ ++ protected boolean recovery; ++ + /** Keys to update. */ + @GridToStringInclude + @GridDirectCollection(KeyCacheObject.class) + private List<KeyCacheObject> keys; + + /** Values to update. */ + @GridDirectCollection(CacheObject.class) + private List<CacheObject> vals; + + /** Partitions of keys. */ + @GridDirectCollection(int.class) + private List<Integer> partIds; + + /** Entry processors. */ + @GridDirectTransient + private List<EntryProcessor<Object, Object, Object>> entryProcessors; + + /** Entry processors bytes. */ + @GridDirectCollection(byte[].class) + private List<byte[]> entryProcessorsBytes; + + /** Conflict versions. */ + @GridDirectCollection(GridCacheVersion.class) + private List<GridCacheVersion> conflictVers; + + /** Conflict TTLs. */ + private GridLongList conflictTtls; + + /** Conflict expire times. */ + private GridLongList conflictExpireTimes; - + /** Optional arguments for entry processor. */ + @GridDirectTransient + private Object[] invokeArgs; + + /** Entry processor arguments bytes. */ + private byte[][] invokeArgsBytes; + + /** Expiry policy. */ + @GridDirectTransient + private ExpiryPolicy expiryPlc; + + /** Expiry policy bytes. */ + private byte[] expiryPlcBytes; + + /** Filter. */ + private CacheEntryPredicate[] filter; + + /** Maximum possible size of inner collections. */ + @GridDirectTransient + private int initSize; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridNearAtomicFullUpdateRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheId Cache ID. + * @param nodeId Node ID. + * @param futVer Future version. + * @param fastMap Fast map scheme flag. + * @param updateVer Update version set if fast map is performed. + * @param topVer Topology version. + * @param topLocked Topology locked flag. + * @param syncMode Synchronization mode. + * @param op Cache update operation. + * @param retval Return value required flag. + * @param expiryPlc Expiry policy. + * @param invokeArgs Optional arguments for entry processor. + * @param filter Optional filter for atomic check. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip write-through to a persistent storage. + * @param keepBinary Keep binary flag. + * @param clientReq Client node request flag. + * @param addDepInfo Deployment info flag. + * @param maxEntryCnt Maximum entries count. + */ + GridNearAtomicFullUpdateRequest( + int cacheId, + UUID nodeId, + GridCacheVersion futVer, + boolean fastMap, + @Nullable GridCacheVersion updateVer, + @NotNull AffinityTopologyVersion topVer, + boolean topLocked, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + @Nullable ExpiryPolicy expiryPlc, + @Nullable Object[] invokeArgs, + @Nullable CacheEntryPredicate[] filter, + @Nullable UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, ++ boolean recovery, + boolean clientReq, + boolean addDepInfo, + int maxEntryCnt + ) { + assert futVer != null; + + this.cacheId = cacheId; + this.nodeId = nodeId; + this.futVer = futVer; + this.fastMap = fastMap; + this.updateVer = updateVer; + + this.topVer = topVer; + this.topLocked = topLocked; + this.syncMode = syncMode; + this.op = op; + this.retval = retval; + this.expiryPlc = expiryPlc; + this.invokeArgs = invokeArgs; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; ++ this.recovery = recovery; + this.clientReq = clientReq; + this.addDepInfo = addDepInfo; + + // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries + // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys + // participate in request. As such, we know upper bound of all collections in request. If this bound is lower + // than 10, we use it. + initSize = Math.min(maxEntryCnt, 10); + + keys = new ArrayList<>(initSize); + + partIds = new ArrayList<>(initSize); + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public UUID subjectId() { + return subjId; + } + + /** {@inheritDoc} */ + @Override public int taskNameHash() { + return taskNameHash; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion futureVersion() { + return futVer; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion updateVersion() { + return updateVer; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } + + /** {@inheritDoc} */ + @Override public GridCacheOperation operation() { + return op; + } + + /** {@inheritDoc} */ + @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { + if (this.res == null) { + this.res = res; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override @Nullable public GridNearAtomicUpdateResponse response() { + return res; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ + @Override public void addUpdateEntry(KeyCacheObject key, + @Nullable Object val, + long conflictTtl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer, + boolean primary) { + EntryProcessor<Object, Object, Object> entryProcessor = null; + + if (op == TRANSFORM) { + assert val instanceof EntryProcessor : val; + + entryProcessor = (EntryProcessor<Object, Object, Object>)val; + } + + assert val != null || op == DELETE; + + keys.add(key); + partIds.add(key.partition()); + + if (entryProcessor != null) { + if (entryProcessors == null) + entryProcessors = new ArrayList<>(initSize); + + entryProcessors.add(entryProcessor); + } + else if (val != null) { + assert val instanceof CacheObject : val; + + if (vals == null) + vals = new ArrayList<>(initSize); + + vals.add((CacheObject)val); + } + + hasPrimary |= primary; + + // In case there is no conflict, do not create the list. + if (conflictVer != null) { + if (conflictVers == null) { + conflictVers = new ArrayList<>(initSize); + + for (int i = 0; i < keys.size() - 1; i++) + conflictVers.add(null); + } + + conflictVers.add(conflictVer); + } + else if (conflictVers != null) + conflictVers.add(null); + + if (conflictTtl >= 0) { + if (conflictTtls == null) { + conflictTtls = new GridLongList(keys.size()); + + for (int i = 0; i < keys.size() - 1; i++) + conflictTtls.add(CU.TTL_NOT_CHANGED); + } + + conflictTtls.add(conflictTtl); + } + + if (conflictExpireTime >= 0) { + if (conflictExpireTimes == null) { + conflictExpireTimes = new GridLongList(keys.size()); + + for (int i = 0; i < keys.size() - 1; i++) + conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE); + } + + conflictExpireTimes.add(conflictExpireTime); + } + } + + /** {@inheritDoc} */ + @Override public List<KeyCacheObject> keys() { + return keys; + } + + /** {@inheritDoc} */ + @Override public int size() { + return keys != null ? keys.size() : 0; + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject key(int idx) { + return keys.get(idx); + } + + /** {@inheritDoc} */ + @Override public List<?> values() { + return op == TRANSFORM ? entryProcessors : vals; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheObject value(int idx) { + assert op == UPDATE : op; + + return vals.get(idx); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { + assert op == TRANSFORM : op; + + return entryProcessors.get(idx); + } + + /** {@inheritDoc} */ + @Override public CacheObject writeValue(int idx) { + if (vals != null) + return vals.get(idx); + + return null; + } + + /** {@inheritDoc} */ + @Override @Nullable public List<GridCacheVersion> conflictVersions() { + return conflictVers; + } + + /** {@inheritDoc} */ + @Override @Nullable public GridCacheVersion conflictVersion(int idx) { + if (conflictVers != null) { + assert idx >= 0 && idx < conflictVers.size(); + + return conflictVers.get(idx); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public long conflictTtl(int idx) { + if (conflictTtls != null) { + assert idx >= 0 && idx < conflictTtls.size(); + + return conflictTtls.get(idx); + } + + return CU.TTL_NOT_CHANGED; + } + + /** {@inheritDoc} */ + @Override public long conflictExpireTime(int idx) { + if (conflictExpireTimes != null) { + assert idx >= 0 && idx < conflictExpireTimes.size(); + + return conflictExpireTimes.get(idx); + } + + return CU.EXPIRE_TIME_CALCULATE; + } + + /** {@inheritDoc} */ + @Override @Nullable public Object[] invokeArguments() { + return invokeArgs; + } + + /** {@inheritDoc} */ + @Override public boolean fastMap() { + return fastMap; + } + + /** {@inheritDoc} */ + @Override public boolean topologyLocked() { + return topLocked; + } + + /** {@inheritDoc} */ + @Override public boolean clientRequest() { + return clientReq; + } + + /** {@inheritDoc} */ + @Override public boolean returnValue() { + return retval; + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return skipStore; + } + + /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return keepBinary; + } + + /** {@inheritDoc} */ + @Override public boolean hasPrimary() { + return hasPrimary; + } + + /** {@inheritDoc} */ ++ @Override public boolean recovery() { ++ return recovery; ++ } ++ ++ /** {@inheritDoc} */ + @Override @Nullable public CacheEntryPredicate[] filter() { + return filter; + } + + /** {@inheritDoc} */ + @Override public ExpiryPolicy expiry() { + return expiryPlc; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + if (expiryPlc != null && expiryPlcBytes == null) + expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); + + prepareMarshalCacheObjects(keys, cctx); + + if (filter != null) { + boolean hasFilter = false; + + for (CacheEntryPredicate p : filter) { + if (p != null) { + hasFilter = true; + + p.prepareMarshal(cctx); + } + } + + if (!hasFilter) + filter = null; + } + + if (op == TRANSFORM) { + // force addition of deployment info for entry processors if P2P is enabled globally. + if (!addDepInfo && ctx.deploymentEnabled()) + addDepInfo = true; + + if (entryProcessorsBytes == null) + entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + + if (invokeArgsBytes == null) + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + } + else + prepareMarshalCacheObjects(vals, cctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + if (expiryPlcBytes != null && expiryPlc == null) + expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + finishUnmarshalCacheObjects(keys, cctx, ldr); + + if (filter != null) { + for (CacheEntryPredicate p : filter) { + if (p != null) + p.finishUnmarshal(cctx, ldr); + } + } + + if (op == TRANSFORM) { + if (entryProcessors == null) + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + + if (invokeArgs == null) + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); + } + else + finishUnmarshalCacheObjects(vals, cctx, ldr); + + if (partIds != null && !partIds.isEmpty()) { + assert partIds.size() == keys.size(); + + for (int i = 0; i < keys.size(); i++) + keys.get(i).partition(partIds.get(i)); + } + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeBoolean("clientReq", clientReq)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("conflictTtls", conflictTtls)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeBoolean("fastMap", fastMap)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeMessage("futVer", futVer)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeBoolean("hasPrimary", hasPrimary)) + return false; + + writer.incrementState(); + + case 13: + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + return false; + + writer.incrementState(); + + case 14: + if (!writer.writeBoolean("keepBinary", keepBinary)) + return false; + + writer.incrementState(); + + case 15: + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 16: + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 17: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 18: - if (!writer.writeBoolean("retval", retval)) ++ if (!writer.writeBoolean("recovery", recovery)) + return false; + + writer.incrementState(); + + case 19: - if (!writer.writeBoolean("skipStore", skipStore)) ++ if (!writer.writeBoolean("retval", retval)) + return false; + + writer.incrementState(); + + case 20: - if (!writer.writeUuid("subjId", subjId)) ++ if (!writer.writeBoolean("skipStore", skipStore)) + return false; + + writer.incrementState(); + + case 21: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) ++ if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 22: - if (!writer.writeInt("taskNameHash", taskNameHash)) ++ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 23: - if (!writer.writeBoolean("topLocked", topLocked)) ++ if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 24: - if (!writer.writeMessage("topVer", topVer)) ++ if (!writer.writeBoolean("topLocked", topLocked)) + return false; + + writer.incrementState(); + + case 25: - if (!writer.writeMessage("updateVer", updateVer)) ++ if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 26: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) ++ if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + ++ case 27: ++ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) ++ return false; ++ ++ writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + clientReq = reader.readBoolean("clientReq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + conflictTtls = reader.readMessage("conflictTtls"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + fastMap = reader.readBoolean("fastMap"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + futVer = reader.readMessage("futVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + hasPrimary = reader.readBoolean("hasPrimary"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: + keepBinary = reader.readBoolean("keepBinary"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: + byte opOrd; + + opOrd = reader.readByte("op"); + + if (!reader.isLastRead()) + return false; + + op = GridCacheOperation.fromOrdinal(opOrd); + + reader.incrementState(); + + case 17: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: - retval = reader.readBoolean("retval"); ++ recovery = reader.readBoolean("recovery"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: - skipStore = reader.readBoolean("skipStore"); ++ retval = reader.readBoolean("retval"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: - subjId = reader.readUuid("subjId"); ++ skipStore = reader.readBoolean("skipStore"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 21: ++ subjId = reader.readUuid("subjId"); ++ ++ if (!reader.isLastRead()) ++ return false; ++ ++ reader.incrementState(); ++ ++ case 22: + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); + + if (!reader.isLastRead()) + return false; + + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + + reader.incrementState(); + - case 22: ++ case 23: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + - case 23: ++ case 24: + topLocked = reader.readBoolean("topLocked"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + - case 24: ++ case 25: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + - case 25: ++ case 26: + updateVer = reader.readMessage("updateVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + - case 26: ++ case 27: + vals = reader.readCollection("vals", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); - + } + + return reader.afterMessageRead(GridNearAtomicFullUpdateRequest.class); + } + + /** {@inheritDoc} */ + @Override public void cleanup(boolean clearKeys) { + vals = null; + entryProcessors = null; + entryProcessorsBytes = null; + invokeArgs = null; + invokeArgsBytes = null; + + if (clearKeys) + keys = null; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 40; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { - return 27; ++ return 28; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearAtomicFullUpdateRequest.class, this, "filter", Arrays.toString(filter), + "parent", super.toString()); + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 4860eb4,bd231cf..fd2479c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@@ -561,28 -563,94 +565,97 @@@ public class GridNearAtomicSingleUpdate throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + "left the grid)."); - GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - primary.id(), - futVer, - false, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - recovery, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - 1); + GridNearAtomicAbstractUpdateRequest req; + + if (canUseSingleRequest(primary)) { + if (op == TRANSFORM) { + req = new GridNearAtomicSingleUpdateInvokeRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + invokeArgs, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled()); + } + else { + if (filter == null || filter.length == 0) { + req = new GridNearAtomicSingleUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + subjId, + taskNameHash, + skipStore, + keepBinary, ++ recovery, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled()); + } + else { + req = new GridNearAtomicSingleUpdateFilterRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, ++ recovery, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled()); + } + } + } + else { + req = new GridNearAtomicFullUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, ++ recovery, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + } req.addUpdateEntry(cacheKey, val, http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 345e0d1,6e69161..f1e2c01 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@@ -19,11 -19,10 +19,12 @@@ package org.apache.ignite.internal.proc import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Map; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; + import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6282d03,4f34401..d2b893f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -1101,8 -984,10 +1068,8 @@@ public class GridDhtPartitionsExchangeF * @throws IgniteCheckedException If failed. */ private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage m = createPartitionsMessage(); + GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true); - assert !nodes.contains(cctx.localNode()); - if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); @@@ -1553,32 -1265,15 +1557,37 @@@ if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff); + cacheCtx.topology().beforeExchange(this, !centralizedAff); + } + } + + if (discoEvt.type() == EVT_NODE_JOINED) { + if (cctx.cache().globalState() != CacheState.INACTIVE) + assignPartitionsStates(); + } + else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + assert discoEvt instanceof DiscoveryCustomEvent; + + if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt) + .customMessage(); + + for (DynamicCacheChangeRequest req : batch.requests()) { + if (req.resetLostPartitions()) + resetLostPartitions(); + else if (req.globalStateChange() && req.state() != CacheState.INACTIVE) + assignPartitionsStates(); + } } } + else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) + detectLostPartitions(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) + cacheCtx.topology().checkEvictions(); + } + updateLastVersion(cctx.versions().last()); cctx.versions().onExchange(lastVer.get().order()); @@@ -1944,7 -1613,9 +1953,9 @@@ if (crd0.isLocal()) { if (allReceived) { + awaitSingleMapUpdates(); + - onAllReceived(true); + onAllReceived(); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 3d91468,90d6242..5a30f95 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@@ -26,11 -29,12 +29,13 @@@ import org.apache.ignite.internal.GridD import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; + import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 5e4b1c4,bf08f0a..0975a07 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@@ -23,13 -23,16 +23,17 @@@ import java.util.Collections import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; + import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.T2; + import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index deb1731,41bc2fc..cf02071 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@@ -35,10 -34,9 +34,10 @@@ import org.apache.ignite.internal.Ignit import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; + import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; - import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; +import org.apache.ignite.internal.processors.cache.CacheState; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@@ -412,9 -412,12 +413,13 @@@ public class GridDhtPreloader extends G } /** {@inheritDoc} */ - @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, Collection<String> caches, int cnt, @Nullable GridFutureAdapter<Boolean> forcedRebFut) { - return demander.addAssignments(assignments, forcePreload, caches, cnt, forcedRebFut); + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, + boolean forceRebalance, ++ Collection<String> caches, + int cnt, + Runnable next, + @Nullable GridFutureAdapter<Boolean> forcedRebFut) { + return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut); } /** @@@ -763,12 -790,23 +787,27 @@@ try { GridDhtLocalPartition part = partsToEvict.poll(); - if (part != null) + if (part != null) { - part.tryEvict(); + try { + part.tryEvict(); + } + catch (Throwable ex) { + if (cctx.kernalContext().isStopping()) { + LT.warn(log, ex, "Partition eviction failed (current node is stopping).", + false, + true); + + partsToEvict.clear(); + + return true; + } + else + LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); + } + + if (part.state() != EVICTED) + partsToEvict.push(part); + } } finally { if (!partsToEvict.isEmptyx()) http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index b4eeb11,d4decb4..e62bf60 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@@ -26,8 -26,10 +26,9 @@@ import java.util.ArrayDeque import java.util.ArrayList; import java.util.Collection; import java.util.Collections; + import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@@ -52,9 -55,9 +54,10 @@@ import org.apache.ignite.events.Event import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@@ -80,8 -83,10 +83,9 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.task.GridInternal; + import org.apache.ignite.internal.util.GridBoundedPriorityQueue; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; -import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.GridSpiCloseableIteratorWrapper; import org.apache.ignite.internal.util.GridSpinBusyLock; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java index 31ad653,bb769c9..56cf271 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java @@@ -155,17 -157,17 +156,17 @@@ public class GridCacheSqlQuery implemen assert paramsBytes != null; try { - final ClassLoader ldr = U.resolveClassLoader(ctx.config()); + final ClassLoader ldr = U.resolveClassLoader(ctx.config()); - if (m instanceof BinaryMarshaller) - // To avoid deserializing of enum types. - params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr); - else - params = m.unmarshal(paramsBytes, ldr); - } + if (m instanceof BinaryMarshaller) + // To avoid deserializing of enum types. + params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr); + else + params = U.unmarshal(m, paramsBytes, ldr); + } catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + throw new IgniteException(e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 064b725,e2fbf52..d2828e0 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@@ -56,8 -56,7 +56,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; - import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; + import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 762bf13,2706d4d..662a905 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@@ -1410,20 -1495,19 +1504,20 @@@ public class IgniteTxHandler /*expiryPlc*/null, /*keepBinary*/true); - if (val == null) - val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key())); + if (val == null) + val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key())); - if (val != null) - entry.readValue(val); + if (val != null) + entry.readValue(val); - break; - } - catch (GridCacheEntryRemovedException e) { - if (log.isDebugEnabled()) - log.debug("Got entry removed exception, will retry: " + entry.txKey()); + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got entry removed exception, will retry: " + entry.txKey()); - entry.cached(null); - entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion())); ++ entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion())); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 86994b5,ba44655..95fa006 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -2458,11 -2418,11 +2449,14 @@@ public abstract class IgniteTxLocalAdap } } else - old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + old = entry.rawGet(); + + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { ret.set(cacheCtx, old, false, keepBinary); http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ----------------------------------------------------------------------
