ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5c5eb5a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5c5eb5a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5c5eb5a Branch: refs/heads/ignite-4705 Commit: c5c5eb5ade63a64ccf8c193db77978c4e297fc19 Parents: 7287a93 Author: sboikov <[email protected]> Authored: Thu Feb 16 15:07:54 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Feb 16 15:23:12 2017 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 30 +- .../communication/GridIoMessageFactory.java | 10 +- .../processors/cache/GridCacheMessage.java | 4 +- .../cache/GridDeferredAckMessageSender.java | 11 +- .../GridDhtAtomicAbstractUpdateFuture.java | 49 ++- .../GridDhtAtomicAbstractUpdateRequest.java | 135 +++++++- .../dht/atomic/GridDhtAtomicCache.java | 320 ++++++++++++------- .../dht/atomic/GridDhtAtomicNearResponse.java | 268 ++++++++++++++++ .../atomic/GridDhtAtomicSingleUpdateFuture.java | 24 +- .../GridDhtAtomicSingleUpdateRequest.java | 56 ++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 98 +++--- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 7 + .../GridNearAtomicAbstractUpdateFuture.java | 3 +- .../dht/atomic/GridNearAtomicDhtResponse.java | 222 ------------- .../GridNearAtomicSingleUpdateFuture.java | 61 +++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../atomic/GridNearAtomicUpdateResponse.java | 62 +++- .../distributed/dht/atomic/UpdateErrors.java | 187 +++++++++++ .../distributed/near/GridNearAtomicCache.java | 17 +- .../cache/IgniteGetAndPutBenchmark.java | 2 +- .../cache/IgniteGetAndPutTxBenchmark.java | 2 +- 22 files changed, 1081 insertions(+), 497 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 16b1e01..6636bf2 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -46,17 +46,18 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -181,19 +182,20 @@ public class MessageCodeGenerator { // gen.generateAll(true); -// gen.generateAndWrite(GridNearAtomicDhtResponse.class); -// gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class); -// gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class); -// gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class); -// gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class); -// gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class); -// gen.generateAndWrite(GridDhtAtomicUpdateRequest.class); -// gen.generateAndWrite(GridDhtAtomicUpdateResponse.class); -// gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class); -// gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class); -// gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class); -// gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class); -// gen.generateAndWrite(GridNearAtomicUpdateResponse.class); + gen.generateAndWrite(UpdateErrors.class); + gen.generateAndWrite(GridDhtAtomicNearResponse.class); + //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class); + gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class); + gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class); + gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class); + gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class); + gen.generateAndWrite(GridDhtAtomicUpdateRequest.class); + gen.generateAndWrite(GridDhtAtomicUpdateResponse.class); + gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class); + gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class); + gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class); + gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class); + gen.generateAndWrite(GridNearAtomicUpdateResponse.class); // gen.generateAndWrite(GridMessageCollection.class); // gen.generateAndWrite(DataStreamerEntry.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 769a615..5ed46ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -67,15 +67,16 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -174,8 +175,13 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -46: + msg = new UpdateErrors(); + + break; + case -45: - msg = new GridNearAtomicDhtResponse(); + msg = new GridDhtAtomicNearResponse(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 0646d5a..b9fb56a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("ForLoopReplaceableByForEach") - protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col, + public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col, GridCacheContext ctx) throws IgniteCheckedException { if (col == null) return; @@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("ForLoopReplaceableByForEach") - protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col, + public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col, GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java index 8df883a..37ecc79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java @@ -21,7 +21,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -41,16 +40,16 @@ public abstract class GridDeferredAckMessageSender<T> { private GridTimeoutProcessor time; /** Closure processor. */ - public GridClosureProcessor closure; + public GridClosureProcessor c; /** * @param time Time. - * @param closure Closure. + * @param c Closure. */ public GridDeferredAckMessageSender(GridTimeoutProcessor time, - GridClosureProcessor closure) { + GridClosureProcessor c) { this.time = time; - this.closure = closure; + this.c = c; } /** @@ -151,7 +150,7 @@ public abstract class GridDeferredAckMessageSender<T> { /** {@inheritDoc} */ @Override public void onTimeout() { if (guard.compareAndSet(false, true)) { - closure.runLocalSafe(new Runnable() { + c.runLocalSafe(new Runnable() { @Override public void run() { writeLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 10d1c4b..d494d98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -93,9 +93,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Continuous query closures. */ private Collection<CI1<Boolean>> cntQryClsrs; - /** */ - private final boolean waitForExchange; - /** Response count. */ private volatile int resCnt; @@ -113,14 +110,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes) { this.cctx = cctx; - - futId = cctx.mvcc().atomicFutureId(); this.updateReq = updateReq; this.completionCb = completionCb; this.updateRes = updateRes; this.writeVer = writeVer; - waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); + futId = cctx.mvcc().atomicFutureId(); if (log == null) { msgLog = cctx.shared().atomicMessageLogger(); @@ -130,6 +125,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** {@inheritDoc} */ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + boolean waitForExchange = !updateReq.topologyLocked(); + if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0) return this; @@ -160,7 +157,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param updateCntr Partition update counter. */ @SuppressWarnings("ForLoopReplaceableByForEach") - final void addWriteEntry(GridDhtCacheEntry entry, + final void addWriteEntry( + UUID nearNodeId, + GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, @@ -190,7 +189,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte if (updateReq == null) { updateReq = createRequest( - node, + node.id(), + nearNodeId, futId, writeVer, syncMode, @@ -236,7 +236,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ - final void addNearWriteEntries(Collection<UUID> readers, + final void addNearWriteEntries( + UUID nearNodeId, + Collection<UUID> readers, GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, @@ -259,7 +261,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte continue; updateReq = createRequest( - node, + node.id(), + nearNodeId, futId, writeVer, syncMode, @@ -352,9 +355,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * Sends requests to remote nodes. */ final void map() { + boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC; + if (!F.isEmpty(mappings)) { + List<UUID> dhtNodes = null; + + if (fullSync) { + dhtNodes = new ArrayList<>(mappings.size()); + + dhtNodes.addAll(mappings.keySet()); + } + for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) { try { + req.dhtNodes(dhtNodes); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (msgLog.isDebugEnabled()) { @@ -383,7 +398,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte // Send response right away if no ACKs from backup is required. // Backups will send ACKs anyway, future will be completed after all backups have replied. - if (updateReq.writeSynchronizationMode() != FULL_SYNC) + if (!fullSync) completionCb.apply(updateReq, updateRes); } @@ -400,7 +415,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } /** - * @param node Node. + * @param nodeId Node ID. + * @param nodeId Near node ID. * @param futId Future ID. * @param writeVer Update version. * @param syncMode Write synchronization mode. @@ -411,7 +427,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @return Request. */ protected abstract GridDhtAtomicAbstractUpdateRequest createRequest( - ClusterNode node, + UUID nodeId, + UUID nearNodeId, long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @@ -449,9 +466,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte for (CI1<Boolean> clsr : cntQryClsrs) clsr.apply(suc); } - - if (updateReq.writeSynchronizationMode() == FULL_SYNC) - completionCb.apply(updateReq, updateRes); +// +// if (updateReq.writeSynchronizationMode() == FULL_SYNC) +// completionCb.apply(updateReq, updateRes); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index 7aa440d..3edbf8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -18,10 +18,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.List; import java.util.UUID; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; @@ -29,6 +32,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -46,6 +52,16 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag @GridDirectTransient private boolean onRes; + /** */ + private UUID nearNodeId; + + /** */ + private long nearFutId; + + /** */ + @GridDirectCollection(UUID.class) + private List<UUID> dhtNodes; + /** * Empty constructor required by {@link Externalizable}. */ @@ -58,10 +74,35 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag * * @param cacheId Cache ID. * @param nodeId Node ID. + * @param nearNodeId Near node ID. + * @param nearFutId Future ID on near node. */ - protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) { + protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId, UUID nearNodeId, long nearFutId) { this.cacheId = cacheId; this.nodeId = nodeId; + this.nearNodeId = nearNodeId; + this.nearFutId = nearFutId; + } + + /** + * @return Near node ID. + */ + public UUID nearNodeId() { + return nearNodeId; + } + + /** + * @param dhtNodes DHT nodes. + */ + public void dhtNodes(List<UUID> dhtNodes) { + this.dhtNodes = dhtNodes; + } + + /** + * @return DHT nodes. + */ + public List<UUID> dhtNodes() { + return dhtNodes; } /** {@inheritDoc} */ @@ -166,11 +207,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag public abstract int taskNameHash(); /** - * @return Version assigned on primary node. + * @return Future ID on primary node. */ public abstract long futureId(); /** + * @return Future ID on near node. + */ + public final long nearFutureId() { + return nearFutId; + } + + /** * @return Write version. */ public abstract GridCacheVersion writeVersion(); @@ -284,4 +332,87 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag * @return Optional arguments for entry processor. */ @Nullable public abstract Object[] invokeArguments(); + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 6; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeLong("nearFutId", nearFutId)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeUuid("nearNodeId", nearNodeId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + nearFutId = reader.readLong("nearFutId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + nearNodeId = reader.readUuid("nearNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 87ac54b..3b81ee7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; @@ -93,14 +92,13 @@ import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionIsolation; @@ -112,6 +110,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; @@ -211,17 +210,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { - assert req.writeSynchronizationMode() != FULL_ASYNC : req; - - // Always send reply in CLOCK ordering mode. - sendNearUpdateReply(res.nodeId(), res); - - return; - } - - // Request should be for primary keys only in PRIMARY ordering mode. - assert req.hasPrimary() : req; +// if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { +// assert req.writeSynchronizationMode() != FULL_ASYNC : req; +// +// // Always send reply in CLOCK ordering mode. +// sendNearUpdateReply(res.nodeId(), res); +// +// return; +// } +// +// // Request should be for primary keys only in PRIMARY ordering mode. +// assert req.hasPrimary() : req; if (req.writeSynchronizationMode() != FULL_ASYNC) sendNearUpdateReply(res.nodeId(), res); @@ -422,14 +421,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), GridNearAtomicDhtResponse.class, new CI2<UUID, GridNearAtomicDhtResponse>() { - @Override public void apply(UUID uuid, GridNearAtomicDhtResponse msg) { - + ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { + @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { + processDhtAtomicNearResponse(uuid, msg); } @Override public String toString() { - return "GridDhtAtomicDeferredUpdateResponse handler " + - "[msgIdx=" + GridNearAtomicDhtResponse.CACHE_MSG_IDX + ']'; + return "GridDhtAtomicNearResponse handler " + + "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']'; } }); @@ -1819,12 +1818,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - // Do not check topology version for CLOCK versioning since - // partition exchange will wait for near update future (if future is on server node). - // Also do not check topology version if topology was locked on near node by + // Do not check topology version if topology was locked on near node by // external transaction or explicit lock. - if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || - !needRemap(req.topologyVersion(), top.topologyVersion())) { + if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { @@ -1836,19 +1832,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean hasNear = ctx.discovery().cacheNearNode(node, name()); - GridCacheVersion ver = req.updateVersion(); - - if (ver == null) { - // Assign next version for update inside entries lock. - ver = ctx.versions().next(top.topologyVersion()); + // Assign next version for update inside entries lock. + GridCacheVersion ver = ctx.versions().next(top.topologyVersion()); - if (hasNear) - res.nearVersion(ver); + if (hasNear) + res.nearVersion(ver); - if (msgLog.isDebugEnabled()) { - msgLog.debug("Assigned update version [futId=" + req.futureId() + - ", writeVer=" + ver + ']'); - } + if (msgLog.isDebugEnabled()) { + msgLog.debug("Assigned update version [futId=" + req.futureId() + + ", writeVer=" + ver + ']'); } assert ver != null : "Got null version for update request: " + req; @@ -2413,7 +2405,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Updates locked entries one-by-one. * - * @param node Originating node. + * @param nearNode Originating node. * @param hasNear {@code True} if originating node has near cache. * @param req Update request. * @param res Update response. @@ -2429,7 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws GridCacheEntryRemovedException Should be never thrown. */ private UpdateSingleResult updateSingle( - ClusterNode node, + ClusterNode nearNode, boolean hasNear, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, @@ -2473,9 +2465,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; - boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(), - req.topologyVersion()); - Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); Collection<UUID> readers = null; @@ -2483,38 +2472,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (checkReaders) { readers = entry.readers(); - filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); + filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id())); } GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, - node.id(), + nearNode.id(), locNodeId, op, writeVal, req.invokeArguments(), - (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())) - && writeThrough() && !req.skipStore(), + writeThrough() && !req.skipStore(), !req.skipStore(), sndPrevVal || req.returnValue(), req.keepBinary(), expiry, - true, - true, - primary, - ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. + /*event*/true, + /*metrics*/true, + /*primary*/true, + /*verCheck*/false, topVer, req.filter(), - replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, + replicate ? DR_PRIMARY : DR_NONE, newConflictTtl, newConflictExpireTime, newConflictVer, - true, + /*conflictResolve*/true, intercept, req.subjectId(), taskName, - null, - null, + /*prevVal*/null, + /*updateCntr*/null, dhtFut); if (dhtFut == null && !F.isEmpty(filteredReaders)) { @@ -2535,7 +2523,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { EntryProcessor<Object, Object, Object> entryProcessor = null; if (!readersOnly) { - dhtFut.addWriteEntry(entry, + dhtFut.addWriteEntry( + nearNode.id(), + entry, updRes.newValue(), entryProcessor, updRes.newTtl(), @@ -2547,7 +2537,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (!F.isEmpty(filteredReaders)) - dhtFut.addNearWriteEntries(filteredReaders, + dhtFut.addNearWriteEntries( + nearNode.id(), + filteredReaders, entry, updRes.newValue(), entryProcessor, @@ -2562,8 +2554,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (hasNear) { - if (primary && updRes.sendToDht()) { - if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) { + if (updRes.sendToDht()) { + if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { // If put the same value as in request then do not need to send it back. if (op == TRANSFORM || writeVal != updRes.newValue()) { res.addNearValue(i, @@ -2575,13 +2567,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); if (updRes.newValue() != null) { - IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); assert f == null : f; } } - else if (F.contains(readers, node.id())) // Reader became primary or backup. - entry.removeReader(node.id(), req.messageId()); + else if (F.contains(readers, nearNode.id())) // Reader became primary or backup. + entry.removeReader(nearNode.id(), req.messageId()); else res.addSkippedIndex(i); } @@ -2603,7 +2595,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { if (retVal == null) - retVal = new GridCacheReturn(node.isLocal()); + retVal = new GridCacheReturn(nearNode.isLocal()); retVal.addEntryProcessResult(ctx, k, @@ -2619,7 +2611,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheObject ret = updRes.oldValue(); retVal = new GridCacheReturn(ctx, - node.isLocal(), + nearNode.isLocal(), req.keepBinary(), req.returnValue() ? ret : null, updRes.success()); @@ -2639,7 +2631,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param firstEntryIdx Index of the first entry in the request keys collection. * @param entries Entries to update. * @param ver Version to set. - * @param node Originating node. + * @param nearNode Originating node. * @param writeVals Write values. * @param putMap Values to put. * @param rmvKeys Keys to remove. @@ -2661,7 +2653,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final int firstEntryIdx, final List<GridDhtCacheEntry> entries, final GridCacheVersion ver, - final ClusterNode node, + final ClusterNode nearNode, @Nullable final List<CacheObject> writeVals, @Nullable final Map<KeyCacheObject, CacheObject> putMap, @Nullable final Collection<KeyCacheObject> rmvKeys, @@ -2765,12 +2757,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (checkReaders) { readers = entry.readers(); - filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); + filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id())); } GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, - node.id(), + nearNode.id(), locNodeId, op, writeVal, @@ -2831,7 +2823,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); if (!batchRes.readersOnly()) { - dhtFut.addWriteEntry(entry, + dhtFut.addWriteEntry( + nearNode.id(), + entry, writeVal, entryProcessor, updRes.newTtl(), @@ -2843,7 +2837,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (!F.isEmpty(filteredReaders)) - dhtFut.addNearWriteEntries(filteredReaders, + dhtFut.addNearWriteEntries( + nearNode.id(), + filteredReaders, entry, writeVal, entryProcessor, @@ -2853,7 +2849,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (primary) { - if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) { + if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { int idx = firstEntryIdx + i; if (req.operation() == TRANSFORM) { @@ -2866,13 +2862,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); if (writeVal != null || entry.hasValue()) { - IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); + IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); assert f == null : f; } } - else if (readers.contains(node.id())) // Reader became primary or backup. - entry.removeReader(node.id(), req.messageId()); + else if (readers.contains(nearNode.id())) // Reader became primary or backup. + entry.removeReader(nearNode.id(), req.messageId()); else res.addSkippedIndex(firstEntryIdx + i); } @@ -3148,31 +3144,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean force ) { - if (!force) { - if (updateReq.fastMap()) - return null; - - AffinityTopologyVersion topVer = updateReq.topologyVersion(); - - Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer); - - // We are on primary node for some key. - assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer + - ctx.kernalContext().discovery().discoCache(topVer) + ']'; - - if (nodes.size() == 1) { - if (log.isDebugEnabled()) - log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " + - "[topVer=" + topVer + ", updateReq=" + updateReq + ']'); - - return null; - } - } - if (updateReq.size() == 1) return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); else return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); +// if (!force) { +// if (updateReq.fastMap()) +// return null; +// +// AffinityTopologyVersion topVer = updateReq.topologyVersion(); +// +// Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer); +// +// // We are on primary node for some key. +// assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer + +// ctx.kernalContext().discovery().discoCache(topVer) + ']'; +// +// if (nodes.size() == 1) { +// if (log.isDebugEnabled()) +// log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " + +// "[topVer=" + topVer + ", updateReq=" + updateReq + ']'); +// +// return null; +// } +// } +// +// if (updateReq.size() == 1) +// return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); +// else +// return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); } /** @@ -3225,9 +3225,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion ver = req.writeVersion(); - // Always send update reply. - GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(), - ctx.deploymentEnabled()); + GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ? + new GridDhtAtomicNearResponse(req.nearFutureId(), req.dhtNodes()) : null; Boolean replicate = ctx.isDrEnabled(); @@ -3311,39 +3310,113 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Ignore. } catch (IgniteCheckedException e) { - res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); + if (nearRes != null) + nearRes.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); + + U.error(log, "Failed to update key on backup node: " + key, e); } } - if (isNearEnabled(cacheCfg)) - ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); + GridDhtAtomicUpdateResponse dhtRes = null; - try { - if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { - ctx.io().send(nodeId, res, ctx.ioPolicy()); + if (isNearEnabled(cacheCfg)) { + List<KeyCacheObject> nearEvicted = + ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes); - if (msgLog.isDebugEnabled()) { - msgLog.debug("Sent DHT atomic update response [futId=" + req.futureId() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } - } + dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(), ctx.deploymentEnabled()); + + dhtRes.nearEvicted(nearEvicted); + } + + final boolean RES_AFTER_ACK = false; + + if (nearRes != null) { + if (RES_AFTER_ACK) + sendDhtNearResponse(nodeId, req, nearRes); else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureId() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } + sendDhtNearResponse(null, req, nearRes); - // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. sendDeferredUpdateResponse(nodeId, req.futureId()); } } + else + sendDeferredUpdateResponse(nodeId, req.futureId()); + + if (dhtRes != null) + sendDhtPrimaryResponse(nodeId, req, dhtRes); + } + + /** + * @param nodeId Primary node ID. + * @param req Request. + * @param dhtRes Response to send. + */ + private void sendDhtPrimaryResponse(UUID nodeId, + GridDhtAtomicAbstractUpdateRequest req, + GridDhtAtomicUpdateResponse dhtRes) { + try { + ctx.io().send(nodeId, dhtRes, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", writeVer=" + req.writeVersion() + + ", node=" + nodeId + ']'); + } + } catch (ClusterTopologyCheckedException ignored) { - U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureId() + - ", node=" + req.nodeId() + ']'); + U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + nodeId + ']'); } catch (IgniteCheckedException e) { - U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureId() + - ", node=" + nodeId + ", res=" + res + ']', e); + U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + nodeId + + ", res=" + dhtRes + ']', e); + } + } + + /** + * @param req Request. + * @param nearRes Response to send. + */ + private void sendDhtNearResponse(final UUID primaryId, + final GridDhtAtomicAbstractUpdateRequest req, + GridDhtAtomicNearResponse nearRes) { + try { + ClusterNode node = ctx.discovery().node(req.nearNodeId()); + + if (node == null) + throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId()); + + if (primaryId != null) { + ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy(), new IgniteInClosure<IgniteException>() { + @Override public void apply(IgniteException e) { + sendDeferredUpdateResponse(primaryId, req.futureId()); + } + }); + } + else + ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", writeVer=" + req.writeVersion() + + ", node=" + req.nearNodeId() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + U.warn(msgLog, "Failed to send DHT near response, node left [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + req.nearNodeId() + ']'); + } + catch (IgniteCheckedException e) { + U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + req.nearNodeId() + + ", res=" + nearRes + ']', e); } } @@ -3359,8 +3432,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nodeId Node ID. * @param res Response. */ - private void processNearAtomicDhtResponse(UUID nodeId, GridNearAtomicDhtResponse res) { + private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) { + GridNearAtomicAbstractUpdateFuture updateFut = + (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() + + ", node=" + nodeId + ']'); + } + + updateFut.onResult(nodeId, res); + } + else { + U.warn(msgLog, "Failed to find update future DHT atomic near response [futId=" + res.futureId() + + ", node=" + nodeId + + ", res=" + res + ']'); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java new file mode 100644 index 0000000..628e1dc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * TODO IGNITE-4705: no not send mapping if it == affinity? + */ +public class GridDhtAtomicNearResponse extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** */ + private static final int HAS_RESULT_MASK = 0x1; + + /** */ + private static final int RESULT_SUCCESS_MASK = 0x2; + + /** */ + private long futId; + + /** */ + @GridDirectCollection(UUID.class) + private List<UUID> mapping; + + /** */ + private byte flags; + + /** */ + private UpdateErrors errors; + + /** + * + */ + public GridDhtAtomicNearResponse() { + // No-op. + } + + /** + * @param futId Future ID. + * @param mapping Update mapping. + */ + public GridDhtAtomicNearResponse(long futId, List<UUID> mapping) { + this.futId = futId; + this.mapping = mapping; + } + + /** + * @param key Key. + * @param e Error. + */ + public void addFailedKey(KeyCacheObject key, Throwable e) { + if (errors == null) + errors = new UpdateErrors(); + + errors.addFailedKey(key, e); + } + + /** + * @param success Success flag. + */ + public void setResult(boolean success) { + setFlag(true, HAS_RESULT_MASK); + + setFlag(success, RESULT_SUCCESS_MASK); + } + + /** + * @return Operation result. + */ + public GridCacheReturn result() { + assert hasResult(); + + return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK)); + } + + /** + * @return {@code True} if response contains operation result. + */ + public boolean hasResult() { + return isFlag(HAS_RESULT_MASK); + } + + /** + * @return Update mapping. + */ + public List<UUID> mapping() { + return mapping; + } + + /** + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -45; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 7; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (errors != null) + errors.prepareMarshal(this, ctx.cacheContext(cacheId)); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (errors != null) + errors.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeMessage("errors", errors)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + errors = reader.readMessage("errors"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtAtomicNearResponse.class); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 0c8e482..671034c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -44,9 +43,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture /** */ private static final long serialVersionUID = 0L; - /** */ - private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4"); - /** Future keys. */ private KeyCacheObject key; @@ -97,7 +93,8 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture /** {@inheritDoc} */ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest( - ClusterNode node, + UUID nodeId, + UUID nearNodeId, long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @@ -106,11 +103,13 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture long conflictExpireTime, @Nullable GridCacheVersion conflictVer ) { - if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) { + if (canUseSingleRequest(ttl, conflictExpireTime, conflictVer)) { return new GridDhtAtomicSingleUpdateRequest( cctx.cacheId(), - node.id(), + nodeId, futId, + nearNodeId, + updateReq.futureId(), writeVer, syncMode, topVer, @@ -123,8 +122,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture else { return new GridDhtAtomicUpdateRequest( cctx.cacheId(), - node.id(), + nodeId, futId, + nearNodeId, + updateReq.futureId(), writeVer, syncMode, topVer, @@ -167,18 +168,15 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture } /** - * @param node Target node * @param ttl TTL. * @param conflictExpireTime Conflict expire time. * @param conflictVer Conflict version. * @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}. */ - private boolean canUseSingleRequest(ClusterNode node, - long ttl, + private boolean canUseSingleRequest(long ttl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer) { - return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 && - (ttl == CU.TTL_NOT_CHANGED) && + return (ttl == CU.TTL_NOT_CHANGED) && (conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) && conflictVer == null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index 127c2be..e46c843 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -51,7 +51,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** Near cache key flag. */ private static final int NEAR_FLAG_MASK = 0x80; - /** Future version. */ + /** Future ID on primary. */ protected long futId; /** Write version. */ @@ -116,6 +116,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat int cacheId, UUID nodeId, long futId, + UUID nearNodeId, + long nearFutId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, @@ -125,7 +127,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat boolean keepBinary, boolean skipStore ) { - super(cacheId, nodeId); + super(cacheId, nodeId, nearNodeId, nearFutId); this.futId = futId; this.writeVer = writeVer; this.syncMode = syncMode; @@ -423,73 +425,73 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } switch (writer.state()) { - case 3: + case 6: if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); - case 4: + case 7: if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); - case 5: + case 8: if (!writer.writeMessage("key", key)) return false; writer.incrementState(); - case 6: + case 9: if (!writer.writeInt("partId", partId)) return false; writer.incrementState(); - case 7: + case 10: if (!writer.writeMessage("prevVal", prevVal)) return false; writer.incrementState(); - case 8: + case 11: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 9: + case 12: if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); - case 10: + case 13: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 11: + case 14: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 12: + case 15: if (!writer.writeLong("updateCntr", updateCntr)) return false; writer.incrementState(); - case 13: + case 16: if (!writer.writeMessage("val", val)) return false; writer.incrementState(); - case 14: + case 17: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -511,7 +513,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat return false; switch (reader.state()) { - case 3: + case 6: flags = reader.readByte("flags"); if (!reader.isLastRead()) @@ -519,7 +521,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 4: + case 7: futId = reader.readLong("futId"); if (!reader.isLastRead()) @@ -527,7 +529,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 5: + case 8: key = reader.readMessage("key"); if (!reader.isLastRead()) @@ -535,7 +537,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 6: + case 9: partId = reader.readInt("partId"); if (!reader.isLastRead()) @@ -543,7 +545,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 7: + case 10: prevVal = reader.readMessage("prevVal"); if (!reader.isLastRead()) @@ -551,7 +553,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 8: + case 11: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -559,7 +561,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 9: + case 12: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -571,7 +573,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 10: + case 13: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -579,7 +581,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 11: + case 14: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -587,7 +589,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 12: + case 15: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -595,7 +597,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 13: + case 16: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -603,7 +605,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 14: + case 17: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -653,7 +655,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 18; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 7cb75fa..ea6a1b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -122,7 +122,9 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { } /** {@inheritDoc} */ - @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node, + @Override protected GridDhtAtomicAbstractUpdateRequest createRequest( + UUID nodeId, + UUID nearNodeId, long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @@ -133,8 +135,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { ) { return new GridDhtAtomicUpdateRequest( cctx.cacheId(), - node.id(), + nodeId, futId, + nearNodeId, + updateReq.futureId(), writeVer, syncMode, topVer,
