http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index deb9ce4..2826215 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -18,26 +18,65 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; +import java.nio.ByteBuffer; import java.util.UUID; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** * */ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { + /** Skip store flag bit mask. */ + private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01; + + /** Keep binary flag. */ + private static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02; + + /** Near cache key flag. */ + private static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04; + + /** */ + static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08; + + /** */ + private static final int DHT_ATOMIC_REPLY_WITHOUT_DELAY = 0x10; + /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); + /** Future ID on primary. */ + protected long futId; + + /** Write version. */ + protected GridCacheVersion writeVer; + + /** Write synchronization mode. */ + protected CacheWriteSynchronizationMode syncMode; + + /** Topology version. */ + protected AffinityTopologyVersion topVer; + + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + /** Node ID. */ @GridDirectTransient protected UUID nodeId; @@ -46,6 +85,15 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag @GridDirectTransient private boolean onRes; + /** */ + private UUID nearNodeId; + + /** */ + private long nearFutId; + + /** Additional flags. */ + protected byte flags; + /** * Empty constructor required by {@link Externalizable}. */ @@ -59,9 +107,68 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag * @param cacheId Cache ID. * @param nodeId Node ID. */ - protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) { + protected GridDhtAtomicAbstractUpdateRequest(int cacheId, + UUID nodeId, + long futId, + GridCacheVersion writeVer, + CacheWriteSynchronizationMode syncMode, + @NotNull AffinityTopologyVersion topVer, + UUID subjId, + int taskNameHash, + boolean addDepInfo, + boolean keepBinary, + boolean skipStore + ) { this.cacheId = cacheId; this.nodeId = nodeId; + this.futId = futId; + this.writeVer = writeVer; + this.syncMode = syncMode; + this.topVer = topVer; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.addDepInfo = addDepInfo; + + if (skipStore) + setFlag(true, DHT_ATOMIC_SKIP_STORE_FLAG_MASK); + if (keepBinary) + setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK); + } + + void nearReplyInfo(UUID nearNodeId, long nearFutId) { + assert nearNodeId != null; + + this.nearNodeId = nearNodeId; + this.nearFutId = nearFutId; + } + + boolean replyWithoutDelay() { + return isFlag(DHT_ATOMIC_REPLY_WITHOUT_DELAY); + } + + void replyWithoutDelay(boolean replyWithoutDelay) { + setFlag(replyWithoutDelay, DHT_ATOMIC_REPLY_WITHOUT_DELAY); + } + + /** + * @param res Result flag. + */ + void hasResult(boolean res) { + setFlag(res, DHT_ATOMIC_HAS_RESULT_MASK); + } + + /** + * @return Result flag. + */ + private boolean hasResult() { + return isFlag(DHT_ATOMIC_HAS_RESULT_MASK); + } + + /** + * @return Near node ID. + */ + public UUID nearNodeId() { + return nearNodeId; } /** {@inheritDoc} */ @@ -77,14 +184,25 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag } /** + * @return Flags. + */ + public final byte flags() { + return flags; + } + + /** * @return Keep binary flag. */ - public abstract boolean keepBinary(); + public final boolean keepBinary() { + return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK); + } /** * @return Skip write-through to a persistent storage. */ - public abstract boolean skipStore(); + public final boolean skipStore() { + return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK); + } /** * @return {@code True} if on response flag changed. @@ -93,6 +211,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag return !onRes && (onRes = true); } + /** + * @return {@code True} if response was received. + */ + boolean hasResponse() { + return onRes; + } + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return addDepInfo; @@ -121,7 +246,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). * @param addPrevVal If {@code true} adds previous value. - * @param partId Partition. * @param prevVal Previous value. * @param updateCntr Update counter. */ @@ -132,7 +256,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag long conflictExpireTime, @Nullable GridCacheVersion conflictVer, boolean addPrevVal, - int partId, @Nullable CacheObject prevVal, long updateCntr ); @@ -158,27 +281,44 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag /** * @return Subject ID. */ - public abstract UUID subjectId(); + public final UUID subjectId() { + return subjId; + } /** * @return Task name. */ - public abstract int taskNameHash(); + public final int taskNameHash() { + return taskNameHash; + } + + /** + * @return Future ID on primary node. + */ + public final long futureId() { + return futId; + } /** - * @return Version assigned on primary node. + * @return Future ID on near node. */ - public abstract GridCacheVersion futureVersion(); + public final long nearFutureId() { + return nearFutId; + } /** * @return Write version. */ - public abstract GridCacheVersion writeVersion(); + public final GridCacheVersion writeVersion() { + return writeVer; + } /** * @return Cache write synchronization mode. */ - public abstract CacheWriteSynchronizationMode writeSynchronizationMode(); + public final CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } /** * @return Keys size. @@ -203,12 +343,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag public abstract KeyCacheObject key(int idx); /** - * @param idx Partition index. - * @return Partition id. - */ - public abstract int partitionId(int idx); - - /** * @param updCntr Update counter. * @return Update counter. */ @@ -284,4 +418,228 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag * @return Optional arguments for entry processor. */ @Nullable public abstract Object[] invokeArguments(); + + /** + * @return {@code True} if near cache update request. + */ + protected final boolean near() { + return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK); + } + + /** + * @param near Near cache update flag. + */ + protected final void near(boolean near) { + setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK); + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + protected final void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + final boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 12; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeLong("nearFutId", nearFutId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeUuid("nearNodeId", nearNodeId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeMessage("writeVer", writeVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + nearFutId = reader.readLong("nearFutId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + nearNodeId = reader.readUuid("nearNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); + + if (!reader.isLastRead()) + return false; + + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + + reader.incrementState(); + + case 9: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + writeVer = reader.readMessage("writeVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder flags = new StringBuilder(); + + if (skipStore()) + appendFlag(flags, "skipStore"); + if (keepBinary()) + appendFlag(flags, "keepBinary"); + if (near()) + appendFlag(flags, "near"); + if (hasResult()) + appendFlag(flags, "hasRes"); + if (replyWithoutDelay()) + appendFlag(flags, "resNoDelay"); + + return S.toString(GridDhtAtomicAbstractUpdateRequest.class, this, + "flags", flags.toString()); + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cebf4ae..c20ed48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; @@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; -import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -80,6 +80,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -90,8 +92,6 @@ import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -99,16 +99,14 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; @@ -134,12 +132,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT = Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500); + /** */ + private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes = + new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() { + @Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() { + return new HashMap<>(); + } + }; + /** Update reply closure. */ @GridToStringExclude - private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; - - /** Pending */ - private GridDeferredAckMessageSender deferredUpdateMsgSnd; + private UpdateReplyClosure updateReplyClos; /** */ private GridNearAtomicCache<K, V> near; @@ -205,25 +208,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override protected void init() { super.init(); - updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() { + updateReplyClos = new UpdateReplyClosure() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { - assert req.writeSynchronizationMode() != FULL_ASYNC : req; - - // Always send reply in CLOCK ordering mode. - sendNearUpdateReply(res.nodeId(), res); - - return; - } - - // Request should be for primary keys only in PRIMARY ordering mode. - assert req.hasPrimary() : req; - if (req.writeSynchronizationMode() != FULL_ASYNC) sendNearUpdateReply(res.nodeId(), res); else { - if (!F.isEmpty(res.remapKeys())) + if (res.remapTopologyVersion() != null) // Remap keys on primary node in FULL_ASYNC mode. remapToNewPrimary(req); else if (res.error() != null) { @@ -240,53 +231,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public void start() throws IgniteCheckedException { super.start(); - deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) { - @Override public int getTimeout() { - return DEFERRED_UPDATE_RESPONSE_TIMEOUT; - } - - @Override public int getBufferSize() { - return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE; - } - - @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) { - GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), - vers, ctx.deploymentEnabled()); - - try { - ctx.kernalContext().gateway().readLock(); - - try { - ctx.io().send(nodeId, msg, ctx.ioPolicy()); - - if (msgLog.isDebugEnabled()) { - msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() + - ", node=" + nodeId + ']'); - } - } - finally { - ctx.kernalContext().gateway().readUnlock(); - } - } - catch (IllegalStateException ignored) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Failed to send deferred DHT update response, node is stopping [" + - "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Failed to send deferred DHT update response, node left [" + - "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send deferred DHT update response to remote node [" + - "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e); - } - } - }; - CacheMetricsImpl m = new CacheMetricsImpl(ctx); if (ctx.dht().near() != null) @@ -419,6 +363,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addHandler(ctx.cacheId(), + GridDhtAtomicNearResponse.class, + new CI2<UUID, GridDhtAtomicNearResponse>() { + @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { + processDhtAtomicNearResponse(uuid, msg); + } + + @Override public String toString() { + return "GridDhtAtomicNearResponse handler " + + "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']'; + } + }); + + ctx.io().addHandler(ctx.cacheId(), + GridNearAtomicCheckUpdateRequest.class, + new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { + @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) { + processCheckUpdateRequest(uuid, msg); + } + + @Override public String toString() { + return "GridNearAtomicCheckUpdateRequest handler " + + "[msgIdx=" + GridNearAtomicCheckUpdateRequest.CACHE_MSG_IDX + ']'; + } + }); + if (near == null) { ctx.io().addHandler( ctx.cacheId(), @@ -450,11 +420,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - /** {@inheritDoc} */ - @Override public void stop() { - deferredUpdateMsgSnd.stop(); - } - /** * @param near Near cache. */ @@ -1341,9 +1306,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheEntryPredicate[] filters = CU.filterArray(filter); - if (conflictPutVal == null && - conflictRmvVer == null && - !isFastMap(filters, op)) { + if (conflictPutVal == null && conflictRmvVer == null) { return new GridNearAtomicSingleUpdateFuture( ctx, this, @@ -1389,19 +1352,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** - * Whether this is fast-map operation. - * - * @param filters Filters. - * @param op Operation. - * @return {@code True} if fast-map. - */ - public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) { - return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC && - ctx.config().getAtomicWriteOrderMode() == CLOCK && - !(ctx.writeThrough() && ctx.config().getInterceptor() != null); - } - - /** * Entry point for all public API remove methods. * * @param keys Keys to remove. @@ -1696,10 +1646,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Update request. * @param completionCb Completion callback. */ - public void updateAllAsyncInternal( + void updateAllAsyncInternal( final UUID nodeId, final GridNearAtomicAbstractUpdateRequest req, - final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb + final UpdateReplyClosure completionCb ) { IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion()); @@ -1748,12 +1698,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private void onForceKeysError(final UUID nodeId, final GridNearAtomicAbstractUpdateRequest req, - final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + final UpdateReplyClosure completionCb, IgniteCheckedException e ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, - req.futureVersion(), + req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.addFailedKeys(req.keys(), e); @@ -1771,12 +1723,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private void updateAllAsyncInternal0( UUID nodeId, GridNearAtomicAbstractUpdateRequest req, - CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb + UpdateReplyClosure completionCb ) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), - ctx.deploymentEnabled()); + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + U.warn(msgLog, "Skip near update request, node originated update request left [" + + "futId=" + req.futureId() + ", node=" + nodeId + ']'); - res.partition(req.partition()); + return; + } + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + req.futureId(), + req.partition(), + false, + ctx.deploymentEnabled()); assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); @@ -1791,7 +1754,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. - List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion()); + List<GridDhtCacheEntry> locked = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; @@ -1810,43 +1773,29 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - // Do not check topology version for CLOCK versioning since - // partition exchange will wait for near update future (if future is on server node). - // Also do not check topology version if topology was locked on near node by + // Do not check topology version if topology was locked on near node by // external transaction or explicit lock. - if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || - !needRemap(req.topologyVersion(), top.topologyVersion())) { - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - U.warn(msgLog, "Skip near update request, node originated update request left [" + - "futId=" + req.futureVersion() + ", node=" + nodeId + ']'); - - return; - } + if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { + locked = lockEntries(req, req.topologyVersion()); boolean hasNear = ctx.discovery().cacheNearNode(node, name()); - GridCacheVersion ver = req.updateVersion(); + // Assign next version for update inside entries lock. + GridCacheVersion ver = ctx.versions().next(top.topologyVersion()); - if (ver == null) { - // Assign next version for update inside entries lock. - ver = ctx.versions().next(top.topologyVersion()); + if (hasNear) + res.nearVersion(ver); - if (hasNear) - res.nearVersion(ver); - - if (msgLog.isDebugEnabled()) { - msgLog.debug("Assigned update version [futId=" + req.futureVersion() + - ", writeVer=" + ver + ']'); - } + if (msgLog.isDebugEnabled()) { + msgLog.debug("Assigned update version [futId=" + req.futureId() + + ", writeVer=" + ver + ']'); } assert ver != null : "Got null version for update request: " + req; boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); - dhtFut = createDhtFuture(ver, req, res, completionCb, false); + dhtFut = createDhtFuture(ver, req); expiry = expiryPolicy(req.expiry()); @@ -1866,7 +1815,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { locked, ver, dhtFut, - completionCb, ctx.isDrEnabled(), taskName, expiry, @@ -1886,7 +1834,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { locked, ver, dhtFut, - completionCb, ctx.isDrEnabled(), taskName, expiry, @@ -1902,15 +1849,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.returnValue(retVal); - if (req.writeSynchronizationMode() != FULL_ASYNC) - req.cleanup(!node.isLocal()); - if (dhtFut != null) - ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut); + ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut); } - else + else { // Should remap all keys. remap = true; + + res.remapTopologyVersion(top.topologyVersion()); + } } finally { top.readUnlock(); @@ -1936,12 +1883,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (GridDhtInvalidPartitionException ignore) { - assert !req.fastMap() || req.clientRequest() : req; - if (log.isDebugEnabled()) log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); remap = true; + + res.remapTopologyVersion(ctx.topology().topologyVersion()); } catch (Throwable e) { // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is @@ -1961,18 +1908,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (remap) { assert dhtFut == null; - res.remapKeys(req.keys()); - completionCb.apply(req, res); } - else { - // If there are backups, map backup update future. + else if (dhtFut != null) - dhtFut.map(); - // Otherwise, complete the call. - else - completionCb.apply(req, res); - } + dhtFut.map(node, res.returnValue(), res, completionCb); + + if (req.writeSynchronizationMode() != FULL_ASYNC) + req.cleanup(!node.isLocal()); sendTtlUpdateRequest(expiry); } @@ -1987,7 +1930,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param locked Locked entries. * @param ver Assigned version. * @param dhtFut Optional DHT future. - * @param completionCb Completion callback to invoke when DHT future is completed. * @param replicate Whether replication is enabled. * @param taskName Task name. * @param expiry Expiry policy. @@ -2004,7 +1946,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final List<GridDhtCacheEntry> locked, final GridCacheVersion ver, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, final boolean replicate, final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, @@ -2049,9 +1990,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { for (int i = 0; i < locked.size(); i++) { GridDhtCacheEntry entry = locked.get(i); - if (entry == null) - continue; - try { if (!checkFilter(entry, req, res)) { if (expiry != null && entry.hasValue()) { @@ -2155,7 +2093,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, entryProcessorMap, dhtFut, - completionCb, req, res, replicate, @@ -2204,7 +2141,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { rmvKeys, entryProcessorMap, dhtFut, - completionCb, req, res, replicate, @@ -2331,7 +2267,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { rmvKeys, entryProcessorMap, dhtFut, - completionCb, req, res, replicate, @@ -2404,14 +2339,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Updates locked entries one-by-one. * - * @param node Originating node. + * @param nearNode Originating node. * @param hasNear {@code True} if originating node has near cache. * @param req Update request. * @param res Update response. * @param locked Locked entries. * @param ver Assigned update version. * @param dhtFut Optional DHT future. - * @param completionCb Completion callback to invoke when DHT future is completed. * @param replicate Whether DR is enabled for that cache. * @param taskName Task name. * @param expiry Expiry policy. @@ -2420,14 +2354,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws GridCacheEntryRemovedException Should be never thrown. */ private UpdateSingleResult updateSingle( - ClusterNode node, + ClusterNode nearNode, boolean hasNear, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, List<GridDhtCacheEntry> locked, GridCacheVersion ver, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2440,10 +2373,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); - boolean readersOnly = false; - boolean intercept = ctx.config().getInterceptor() != null; + AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); + // Avoid iterator creation. for (int i = 0; i < req.size(); i++) { KeyCacheObject k = req.key(i); @@ -2455,18 +2388,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { GridDhtCacheEntry entry = locked.get(i); - if (entry == null) - continue; - GridCacheVersion newConflictVer = req.conflictVersion(i); long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; - boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(), - req.topologyVersion()); - Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); Collection<UUID> readers = null; @@ -2474,46 +2401,39 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (checkReaders) { readers = entry.readers(); - filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); + filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id())); } GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, - node.id(), + nearNode.id(), locNodeId, op, writeVal, req.invokeArguments(), - (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())) - && writeThrough() && !req.skipStore(), + writeThrough() && !req.skipStore(), !req.skipStore(), sndPrevVal || req.returnValue(), req.keepBinary(), expiry, - true, - true, - primary, - ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. + /*event*/true, + /*metrics*/true, + /*primary*/true, + /*verCheck*/false, topVer, req.filter(), - replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, + replicate ? DR_PRIMARY : DR_NONE, newConflictTtl, newConflictExpireTime, newConflictVer, - true, + /*conflictResolve*/true, intercept, req.subjectId(), taskName, - null, - null, + /*prevVal*/null, + /*updateCntr*/null, dhtFut); - if (dhtFut == null && !F.isEmpty(filteredReaders)) { - dhtFut = createDhtFuture(ver, req, res, completionCb, true); - - readersOnly = true; - } - if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -2525,20 +2445,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { EntryProcessor<Object, Object, Object> entryProcessor = null; - if (!readersOnly) { - dhtFut.addWriteEntry(entry, - updRes.newValue(), - entryProcessor, - updRes.newTtl(), - updRes.conflictExpireTime(), - newConflictVer, - sndPrevVal, - updRes.oldValue(), - updRes.updateCounter()); - } + dhtFut.addWriteEntry( + affAssignment, + entry, + updRes.newValue(), + entryProcessor, + updRes.newTtl(), + updRes.conflictExpireTime(), + newConflictVer, + sndPrevVal, + updRes.oldValue(), + updRes.updateCounter()); if (!F.isEmpty(filteredReaders)) - dhtFut.addNearWriteEntries(filteredReaders, + dhtFut.addNearWriteEntries( + filteredReaders, entry, updRes.newValue(), entryProcessor, @@ -2553,8 +2474,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (hasNear) { - if (primary && updRes.sendToDht()) { - if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) { + if (updRes.sendToDht()) { + if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { // If put the same value as in request then do not need to send it back. if (op == TRANSFORM || writeVal != updRes.newValue()) { res.addNearValue(i, @@ -2566,13 +2487,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); if (updRes.newValue() != null) { - IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); assert f == null : f; } } - else if (F.contains(readers, node.id())) // Reader became primary or backup. - entry.removeReader(node.id(), req.messageId()); + else if (F.contains(readers, nearNode.id())) // Reader became primary or backup. + entry.removeReader(nearNode.id(), req.messageId()); else res.addSkippedIndex(i); } @@ -2594,7 +2515,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { if (retVal == null) - retVal = new GridCacheReturn(node.isLocal()); + retVal = new GridCacheReturn(nearNode.isLocal()); retVal.addEntryProcessResult(ctx, k, @@ -2610,7 +2531,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheObject ret = updRes.oldValue(); retVal = new GridCacheReturn(ctx, - node.isLocal(), + nearNode.isLocal(), req.keepBinary(), req.returnValue() ? ret : null, updRes.success()); @@ -2630,13 +2551,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param firstEntryIdx Index of the first entry in the request keys collection. * @param entries Entries to update. * @param ver Version to set. - * @param node Originating node. + * @param nearNode Originating node. * @param writeVals Write values. * @param putMap Values to put. * @param rmvKeys Keys to remove. * @param entryProcessorMap Entry processors. * @param dhtFut DHT update future if has backups. - * @param completionCb Completion callback to invoke when DHT future is completed. * @param req Request. * @param res Response. * @param replicate Whether replication is enabled. @@ -2652,13 +2572,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final int firstEntryIdx, final List<GridDhtCacheEntry> entries, final GridCacheVersion ver, - final ClusterNode node, + final ClusterNode nearNode, @Nullable final List<CacheObject> writeVals, @Nullable final Map<KeyCacheObject, CacheObject> putMap, @Nullable final Collection<KeyCacheObject> rmvKeys, @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final boolean replicate, @@ -2681,17 +2600,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheOperation op; if (putMap != null) { - // If fast mapping, filter primary keys for write to store. - Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ? - F.view(putMap, new P1<CacheObject>() { - @Override public boolean apply(CacheObject key) { - return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion()); - } - }) : - putMap; - try { - ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { + ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) { return F.t(v, ver); } @@ -2704,17 +2614,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op = UPDATE; } else { - // If fast mapping, filter primary keys for write to store. - Collection<KeyCacheObject> storeKeys = req.fastMap() ? - F.view(rmvKeys, new P1<Object>() { - @Override public boolean apply(Object key) { - return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion()); - } - }) : - rmvKeys; - try { - ctx.store().removeAll(null, storeKeys); + ctx.store().removeAll(null, rmvKeys); } catch (CacheStorePartialUpdateException e) { storeErr = e; @@ -2725,6 +2626,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; + AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); + // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -2747,21 +2650,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert writeVal != null || op == DELETE : "null write value found."; - boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), - entry.partition(), - req.topologyVersion()); - Collection<UUID> readers = null; Collection<UUID> filteredReaders = null; if (checkReaders) { readers = entry.readers(); - filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); + filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id())); } GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, - node.id(), + nearNode.id(), locNodeId, op, writeVal, @@ -2773,11 +2672,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiry, /*event*/true, /*metrics*/true, - primary, - ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. + /*primary*/true, + /*verCheck*/false, topVer, null, - replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, + replicate ? DR_PRIMARY : DR_NONE, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, null, @@ -2811,30 +2710,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { batchRes.addDeleted(entry, updRes, entries); - if (dhtFut == null && !F.isEmpty(filteredReaders)) { - dhtFut = createDhtFuture(ver, req, res, completionCb, true); - - batchRes.readersOnly(true); - } - if (dhtFut != null) { EntryProcessor<Object, Object, Object> entryProcessor = entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); - if (!batchRes.readersOnly()) { - dhtFut.addWriteEntry(entry, - writeVal, - entryProcessor, - updRes.newTtl(), - CU.EXPIRE_TIME_CALCULATE, - null, - sndPrevVal, - updRes.oldValue(), - updRes.updateCounter()); - } + dhtFut.addWriteEntry( + affAssignment, + entry, + writeVal, + entryProcessor, + updRes.newTtl(), + CU.EXPIRE_TIME_CALCULATE, + null, + sndPrevVal, + updRes.oldValue(), + updRes.updateCounter()); if (!F.isEmpty(filteredReaders)) - dhtFut.addNearWriteEntries(filteredReaders, + dhtFut.addNearWriteEntries( + filteredReaders, entry, writeVal, entryProcessor, @@ -2843,30 +2737,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (hasNear) { - if (primary) { - if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) { - int idx = firstEntryIdx + i; - - if (req.operation() == TRANSFORM) { - res.addNearValue(idx, - writeVal, - updRes.newTtl(), - CU.EXPIRE_TIME_CALCULATE); - } - else - res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); + if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { + int idx = firstEntryIdx + i; - if (writeVal != null || entry.hasValue()) { - IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); - - assert f == null : f; - } + if (req.operation() == TRANSFORM) { + res.addNearValue(idx, + writeVal, + updRes.newTtl(), + CU.EXPIRE_TIME_CALCULATE); } - else if (readers.contains(node.id())) // Reader became primary or backup. - entry.removeReader(node.id(), req.messageId()); else - res.addSkippedIndex(firstEntryIdx + i); + res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); + + if (writeVal != null || entry.hasValue()) { + IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); + + assert f == null : f; + } } + else if (readers.contains(nearNode.id())) // Reader became primary or backup. + entry.removeReader(nearNode.id(), req.messageId()); else res.addSkippedIndex(firstEntryIdx + i); } @@ -2879,7 +2769,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (IgniteCheckedException e) { - res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx); + res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e); } if (storeErr != null) { @@ -2888,7 +2778,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { for (Object failedKey : storeErr.failedKeys()) failed.add(ctx.toCacheKeyObject(failedKey)); - res.addFailedKeys(failed, storeErr.getCause(), ctx); + res.addFailedKeys(failed, storeErr.getCause()); } return dhtFut; @@ -2910,23 +2800,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { KeyCacheObject key = req.key(0); while (true) { - try { - GridDhtCacheEntry entry = entryExx(key, topVer); + GridDhtCacheEntry entry = entryExx(key, topVer); - GridUnsafe.monitorEnter(entry); + GridUnsafe.monitorEnter(entry); - if (entry.obsolete()) - GridUnsafe.monitorExit(entry); - else - return Collections.singletonList(entry); - } - catch (GridDhtInvalidPartitionException e) { - // Ignore invalid partition exception in CLOCK ordering mode. - if (ctx.config().getAtomicWriteOrderMode() == CLOCK) - return Collections.singletonList(null); - else - throw e; - } + if (entry.obsolete()) + GridUnsafe.monitorExit(entry); + else + return Collections.singletonList(entry); } } else { @@ -2934,18 +2815,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { while (true) { for (int i = 0; i < req.size(); i++) { - try { - GridDhtCacheEntry entry = entryExx(req.key(i), topVer); + GridDhtCacheEntry entry = entryExx(req.key(i), topVer); - locked.add(entry); - } - catch (GridDhtInvalidPartitionException e) { - // Ignore invalid partition exception in CLOCK ordering mode. - if (ctx.config().getAtomicWriteOrderMode() == CLOCK) - locked.add(null); - else - throw e; - } + locked.add(entry); } boolean retry = false; @@ -3055,7 +2927,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param req Request to remap. */ - private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) { + void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) { assert req.writeSynchronizationMode() == FULL_ASYNC : req; if (log.isDebugEnabled()) @@ -3098,7 +2970,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drPutVals = null; } - final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( + GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, ctx.config().getWriteSynchronizationMode(), @@ -3127,43 +2999,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * * @param writeVer Write version. * @param updateReq Update request. - * @param updateRes Update response. - * @param completionCb Completion callback to invoke when future is completed. - * @param force If {@code true} then creates future without optimizations checks. - * @return Backup update future or {@code null} if there are no backups. + * @return Backup update future. */ - @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture( + private GridDhtAtomicAbstractUpdateFuture createDhtFuture( GridCacheVersion writeVer, - GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes, - CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, - boolean force + GridNearAtomicAbstractUpdateRequest updateReq ) { - if (!force) { - if (updateReq.fastMap()) - return null; - - AffinityTopologyVersion topVer = updateReq.topologyVersion(); - - Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(ctx.cacheId(), topVer); - - // We are on primary node for some key. - assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer + - ctx.kernalContext().discovery().discoCache(topVer) + ']'; - - if (nodes.size() == 1) { - if (log.isDebugEnabled()) - log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " + - "[topVer=" + topVer + ", updateReq=" + updateReq + ']'); - - return null; - } - } - if (updateReq.size() == 1) - return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); + return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq); else - return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); + return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq); } /** @@ -3172,13 +3017,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (msgLog.isDebugEnabled()) { - msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + + msgLog.debug("Received near atomic update request [futId=" + req.futureId() + ", node=" + nodeId + ']'); } - req.nodeId(ctx.localNodeId()); - updateAllAsyncInternal(nodeId, req, updateReplyClos); } @@ -3189,20 +3031,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { if (msgLog.isDebugEnabled()) - msgLog.debug("Received near atomic update response " + - "[futId=" + res.futureVersion() + ", node=" + nodeId + ']'); + msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); GridNearAtomicAbstractUpdateFuture fut = - (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); if (fut != null) - fut.onResult(nodeId, res, false); - + fut.onPrimaryResponse(nodeId, res, false); else U.warn(msgLog, "Failed to find near update future for update response (will ignore) " + - "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']'); + "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']'); + } + + /** + * @param nodeId Node ID. + * @param checkReq Request. + */ + private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) { + /* + * Message is processed in the same stripe, so primary already processed update request. It is possible + * response was not sent if operation result was empty. Near node will get original response or this one. + */ + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + checkReq.futureId(), + checkReq.partition(), + false, + false); + + GridCacheReturn ret = new GridCacheReturn(false, true); + + res.returnValue(ret); + + sendNearUpdateReply(nodeId, res); } /** @@ -3210,20 +3073,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Dht atomic update request. */ private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) { + assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName(); + if (msgLog.isDebugEnabled()) { - msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() + + msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); } + assert req.partition() >= 0 : req; + GridCacheVersion ver = req.writeVersion(); - // Always send update reply. - GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(), - ctx.deploymentEnabled()); + GridDhtAtomicNearResponse nearRes = null; - res.partition(req.partition()); + if (req.nearNodeId() != null) { + nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), + req.partition(), + req.nearFutureId(), + nodeId, + req.flags()); + } - Boolean replicate = ctx.isDrEnabled(); + boolean replicate = ctx.isDrEnabled(); boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; @@ -3305,48 +3176,208 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Ignore. } catch (IgniteCheckedException e) { - res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); + IgniteCheckedException err = + new IgniteCheckedException("Failed to update key on backup node: " + key, e); + + if (nearRes != null) + nearRes.addFailedKey(key, err); + + U.error(log, "Failed to update key on backup node: " + key, e); + } + } + + GridDhtAtomicUpdateResponse dhtRes = null; + + if (isNearEnabled(cacheCfg)) { + List<KeyCacheObject> nearEvicted = + ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes); + + if (nearEvicted != null) { + dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), + req.partition(), + req.futureId(), + ctx.deploymentEnabled()); + + dhtRes.nearEvicted(nearEvicted); } } - if (isNearEnabled(cacheCfg)) - ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); + if (nearRes != null) + sendDhtNearResponse(req, nearRes); + if (dhtRes == null && req.replyWithoutDelay()) { + dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), + req.partition(), + req.futureId(), + ctx.deploymentEnabled()); + } + + if (dhtRes != null) + sendDhtPrimaryResponse(nodeId, req, dhtRes); + else + sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId()); + } + + /** + * @param nodeId Primary node ID. + * @param req Request. + * @param dhtRes Response to send. + */ + private void sendDhtPrimaryResponse(UUID nodeId, + GridDhtAtomicAbstractUpdateRequest req, + GridDhtAtomicUpdateResponse dhtRes) { try { - if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { - ctx.io().send(nodeId, res, ctx.ioPolicy()); + ctx.io().send(nodeId, dhtRes, ctx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", writeVer=" + req.writeVersion() + + ", node=" + nodeId + ']'); } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } + } + catch (ClusterTopologyCheckedException ignored) { + U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + nodeId + + ", res=" + dhtRes + ']', e); + } + } + + /** + * @param part Partition. + * @param primaryId Primary ID. + * @param futId Future ID. + */ + private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) { + Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get(); + + GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId); + + if (msg == null) { + msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), + new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE)); + + if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) { + GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId); + + msg.timeoutSender(timeoutSnd); + + ctx.time().addTimeoutObject(timeoutSnd); + } + + resMap.put(primaryId, msg); + } + + GridLongList futIds = msg.futureIds(); + + assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size(); + + futIds.add(futId); + + if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) { + resMap.remove(primaryId); + + sendDeferredUpdateResponse(primaryId, msg); + } + } + + /** + * @param primaryId Primary ID. + * @param msg Message. + */ + private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) { + try { + GridTimeoutObject timeoutSnd = msg.timeoutSender(); + + if (timeoutSnd != null) + ctx.time().removeTimeoutObject(timeoutSnd); + + ctx.io().send(primaryId, msg, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() + + ", node=" + primaryId + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node left [" + + "futIds=" + msg.futureIds() + ", node=" + primaryId + ']'); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send deferred DHT update response to remote node [" + + "futIds=" + msg.futureIds() + ", node=" + primaryId + ']', e); + } + } + + /** + * @param req Request. + * @param nearRes Response to send. + */ + private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) { + try { + ClusterNode node = ctx.discovery().node(req.nearNodeId()); + + if (node == null) + throw new ClusterTopologyCheckedException("Node failed: " + req.nearNodeId()); - // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. - sendDeferredUpdateResponse(nodeId, req.futureVersion()); + if (node.isLocal()) + processDhtAtomicNearResponse(node.id(), nearRes); + else + ctx.io().send(node, nearRes, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", writeVer=" + req.writeVersion() + + ", node=" + req.nearNodeId() + ']'); } } catch (ClusterTopologyCheckedException ignored) { - U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() + - ", node=" + req.nodeId() + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send DHT near response, node left [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + req.nearNodeId() + ']'); + } } catch (IgniteCheckedException e) { - U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() + - ", node=" + nodeId + ", res=" + res + ']', e); + U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + req.nearNodeId() + + ", res=" + nearRes + ']', e); } } /** - * @param nodeId Node ID to send message to. - * @param ver Version to ack. + * @param nodeId Node ID. + * @param res Response. */ - private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) { - deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver); + private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) { + GridNearAtomicAbstractUpdateFuture updateFut = + (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); + + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() + + ", node=" + nodeId + ']'); + } + + updateFut.onDhtResponse(nodeId, res); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to find future for DHT atomic near response [futId=" + res.futureId() + + ", node=" + nodeId + + ", res=" + res + ']'); + } + } } /** @@ -3355,18 +3386,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { - GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridDhtAtomicAbstractUpdateFuture updateFut = + (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); if (updateFut != null) { if (msgLog.isDebugEnabled()) { - msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() + - ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']'); + msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() + + ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']'); } - updateFut.onResult(nodeId, res); + updateFut.onDhtResponse(nodeId, res); } else { - U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() + + U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']'); } } @@ -3377,19 +3409,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { - for (GridCacheVersion ver : res.futureVersions()) { - GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver); + GridLongList futIds = res.futureIds(); + + assert futIds != null && futIds.size() > 0 : futIds; + + for (int i = 0; i < futIds.size(); i++) { + Long id = futIds.get(i); + + GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id); if (updateFut != null) { if (msgLog.isDebugEnabled()) { - msgLog.debug("Received DHT atomic deferred update response [futId=" + ver + + msgLog.debug("Received DHT atomic deferred update response [futId=" + id + ", writeVer=" + res + ", node=" + nodeId + ']'); } - updateFut.onResult(nodeId); + updateFut.onDeferredResponse(nodeId); } else { - U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver + + U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id + ", nodeId=" + nodeId + ", res=" + res + ']'); } } @@ -3404,16 +3442,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.io().send(nodeId, res, ctx.ioPolicy()); if (msgLog.isDebugEnabled()) - msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']'); + msgLog.debug("Sent near update response [futId=" + res.futureId() + ", node=" + nodeId + ']'); } catch (ClusterTopologyCheckedException ignored) { if (msgLog.isDebugEnabled()) { - msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() + + msgLog.debug("Failed to send near update response [futId=" + res.futureId() + ", node=" + nodeId + ']'); } } catch (IgniteCheckedException e) { - U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() + + U.error(msgLog, "Failed to send near update response [futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']', e); } } @@ -3482,9 +3520,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private GridDhtAtomicAbstractUpdateFuture dhtFut; /** */ - private boolean readersOnly; - - /** */ private GridCacheReturn invokeRes; /** @@ -3537,20 +3572,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) { this.dhtFut = dhtFut; } - - /** - * @return {@code True} if only readers (not backups) should be updated. - */ - private boolean readersOnly() { - return readersOnly; - } - - /** - * @param readersOnly {@code True} if only readers (not backups) should be updated. - */ - private void readersOnly(boolean readersOnly) { - this.readersOnly = readersOnly; - } } /** @@ -3569,4 +3590,71 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return Collections.emptyList(); } } + + /** + * + */ + interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> { + // No-op. + } + + /** + * + */ + private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable { + /** */ + private final int part; + + /** */ + private final UUID primaryId; + + /** */ + private final IgniteUuid id; + + /** */ + private final long endTime; + + /** + * @param part Partition. + * @param primaryId Primary ID. + */ + DeferredUpdateTimeout(int part, UUID primaryId) { + this.part = part; + this.primaryId = primaryId; + + endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT; + + id = IgniteUuid.fromUuid(primaryId); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void run() { + Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get(); + + GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId); + + if (msg != null && msg.timeoutSender() == this) { + msg.timeoutSender(null); + + resMap.remove(primaryId); + + sendDeferredUpdateResponse(primaryId, msg); + } + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + ctx.kernalContext().getStripedExecutorService().execute(part, this); + } + } }
