ignite-4705 Atomic cache protocol change: notify client node from backups
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbc472fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbc472fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbc472fe Branch: refs/heads/ignite-4768-1 Commit: cbc472fe7f058db42ce49652c85981c7b797d229 Parents: f59f46d Author: sboikov <[email protected]> Authored: Mon Mar 13 18:07:20 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 13 18:08:50 2017 +0300 ---------------------------------------------------------------------- .../rest/protocols/tcp/MockNioSession.java | 5 +- .../org/apache/ignite/internal/IgnitionEx.java | 5 +- .../internal/binary/BinaryObjectImpl.java | 43 +- .../connection/GridClientNioTcpConnection.java | 2 +- .../managers/communication/GridIoManager.java | 5 +- .../communication/GridIoMessageFactory.java | 20 +- .../processors/cache/CacheObjectContext.java | 3 +- .../processors/cache/GridCacheAtomicFuture.java | 5 +- .../processors/cache/GridCacheIoManager.java | 83 +- .../processors/cache/GridCacheMapEntry.java | 12 +- .../processors/cache/GridCacheMessage.java | 17 +- .../processors/cache/GridCacheMvccManager.java | 48 +- .../processors/cache/GridCacheProcessor.java | 1 - .../processors/cache/GridCacheReturn.java | 6 +- .../cache/GridDeferredAckMessageSender.java | 17 +- .../processors/cache/KeyCacheObjectImpl.java | 65 +- .../dht/GridClientPartitionTopology.java | 8 + .../dht/GridDhtPartitionTopology.java | 9 + .../dht/GridDhtPartitionTopologyImpl.java | 23 +- .../GridDhtAtomicAbstractUpdateFuture.java | 298 +++--- .../GridDhtAtomicAbstractUpdateRequest.java | 392 +++++++- .../dht/atomic/GridDhtAtomicCache.java | 896 +++++++++-------- .../GridDhtAtomicDeferredUpdateResponse.java | 68 +- .../dht/atomic/GridDhtAtomicNearResponse.java | 314 ++++++ .../atomic/GridDhtAtomicSingleUpdateFuture.java | 101 +- .../GridDhtAtomicSingleUpdateRequest.java | 277 +----- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 89 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 325 ++----- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 124 +-- ...idNearAtomicAbstractSingleUpdateRequest.java | 481 +--------- .../GridNearAtomicAbstractUpdateFuture.java | 468 +++++++-- .../GridNearAtomicAbstractUpdateRequest.java | 480 ++++++++- .../GridNearAtomicCheckUpdateRequest.java | 175 ++++ .../atomic/GridNearAtomicFullUpdateRequest.java | 487 +--------- ...GridNearAtomicSingleUpdateFilterRequest.java | 23 +- .../GridNearAtomicSingleUpdateFuture.java | 617 ++++++------ ...GridNearAtomicSingleUpdateInvokeRequest.java | 37 +- .../GridNearAtomicSingleUpdateRequest.java | 65 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 962 +++++++++++-------- .../atomic/GridNearAtomicUpdateResponse.java | 192 ++-- .../distributed/dht/atomic/UpdateErrors.java | 222 +++++ .../distributed/near/GridNearAtomicCache.java | 27 +- .../continuous/CacheContinuousQueryHandler.java | 2 +- .../cacheobject/IgniteCacheObjectProcessor.java | 5 +- .../IgniteCacheObjectProcessorImpl.java | 18 +- .../ignite/internal/util/StripedExecutor.java | 8 +- .../util/future/GridCompoundFuture.java | 11 +- .../internal/util/ipc/IpcToNioAdapter.java | 7 +- .../nio/GridConnectionBytesVerifyFilter.java | 7 +- .../util/nio/GridNioAsyncNotifyFilter.java | 7 +- .../internal/util/nio/GridNioCodecFilter.java | 9 +- .../util/nio/GridNioEmbeddedFuture.java | 7 + .../ignite/internal/util/nio/GridNioFilter.java | 12 +- .../internal/util/nio/GridNioFilterAdapter.java | 7 +- .../internal/util/nio/GridNioFilterChain.java | 15 +- .../util/nio/GridNioFinishedFuture.java | 5 - .../ignite/internal/util/nio/GridNioFuture.java | 7 - .../internal/util/nio/GridNioFutureImpl.java | 18 +- .../ignite/internal/util/nio/GridNioServer.java | 83 +- .../internal/util/nio/GridNioSession.java | 7 +- .../internal/util/nio/GridNioSessionImpl.java | 9 +- .../util/nio/GridNioSessionMetaKey.java | 5 +- .../util/nio/GridShmemCommunicationClient.java | 6 +- .../util/nio/GridTcpNioCommunicationClient.java | 13 +- .../internal/util/nio/SessionWriteRequest.java | 7 - .../internal/util/nio/ssl/GridNioSslFilter.java | 12 +- .../util/nio/ssl/GridNioSslHandler.java | 29 +- .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../org/apache/ignite/thread/IgniteThread.java | 41 +- .../ignite/thread/IgniteThreadFactory.java | 2 +- .../GridCommunicationSendMessageSelfTest.java | 2 +- .../cache/CacheRebalancingSelfTest.java | 16 +- .../GridCacheAbstractFailoverSelfTest.java | 2 - .../GridCacheAtomicMessageCountSelfTest.java | 22 +- .../IgniteCacheEntryListenerAbstractTest.java | 1 + ...niteCacheClientNodeChangingTopologyTest.java | 7 - .../IgniteCacheMessageRecoveryAbstractTest.java | 2 +- .../dht/GridCacheAtomicNearCacheSelfTest.java | 23 +- .../IgniteCachePutRetryAbstractSelfTest.java | 37 +- ...gniteCachePutRetryTransactionalSelfTest.java | 2 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 883 +++++++++++++++++ ...erNoStripedPoolMultiNodeFullApiSelfTest.java | 35 - .../near/GridCacheNearReadersSelfTest.java | 17 +- ...edNoStripedPoolMultiNodeFullApiSelfTest.java | 35 - ...eContinuousQueryAsyncFilterListenerTest.java | 2 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 31 +- ...eCacheContinuousQueryImmutableEntryTest.java | 2 +- .../nio/GridNioEmbeddedFutureSelfTest.java | 2 +- .../util/future/nio/GridNioFutureSelfTest.java | 25 +- .../nio/impl/GridNioFilterChainSelfTest.java | 12 +- .../file/GridFileSwapSpaceSpiSelfTest.java | 2 +- .../IgniteCacheFullApiSelfTestSuite.java | 8 +- .../testsuites/IgniteCacheTestSuite5.java | 3 + .../HadoopExternalCommunication.java | 9 +- .../communication/HadoopIpcToNioAdapter.java | 7 +- .../communication/HadoopMarshallerFilter.java | 10 +- .../cache/IgniteGetAndPutBenchmark.java | 2 +- .../cache/IgniteGetAndPutTxBenchmark.java | 2 +- 98 files changed, 5462 insertions(+), 3597 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java index 9bc4e7f..9d1755f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java @@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp; import java.net.InetSocketAddress; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.nio.GridNioFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; /** @@ -112,7 +114,8 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS } /** {@inheritDoc} */ - @Override public void sendNoFuture(Object msg) throws IgniteCheckedException { + @Override public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC) + throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 2d35cdb..f6cfe12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1703,8 +1703,9 @@ public class IgnitionEx { sysExecSvc.allowCoreThreadTimeOut(true); - if (cfg.getStripedPoolSize() > 0) - stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log); + validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool"); + + stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index 7a81659..6fe1a3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -17,6 +17,17 @@ package org.apache.ignite.internal.binary; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Date; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectException; @@ -33,19 +44,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.sql.Time; -import java.sql.Timestamp; -import java.util.Date; -import java.util.UUID; - -import static java.nio.charset.StandardCharsets.*; +import static java.nio.charset.StandardCharsets.UTF_8; /** * Binary object implementation. @@ -74,7 +73,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern private boolean detachAllowed; /** */ - @GridDirectTransient private int part = -1; /** @@ -561,7 +559,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern start = in.readInt(); } - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -584,6 +581,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern writer.incrementState(); case 1: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 2: if (!writer.writeInt("start", detachAllowed ? 0 : start)) return false; @@ -611,6 +614,14 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern reader.incrementState(); case 1: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: start = reader.readInt("start"); if (!reader.isLastRead()) @@ -620,7 +631,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern } - return true; + return reader.afterMessageRead(BinaryObjectImpl.class); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index 8937504..d3a30fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@ -229,7 +229,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { GridNioFuture<?> sslHandshakeFut = null; if (sslCtx != null) { - sslHandshakeFut = new GridNioFutureImpl<>(); + sslHandshakeFut = new GridNioFutureImpl<>(null); meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 0c0dbf7..23738d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -818,10 +818,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - if (ctx.config().getStripedPoolSize() > 0 && - plc == GridIoPolicy.SYSTEM_POOL && - msg.partition() != Integer.MIN_VALUE - ) { + if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) { ctx.getStripedExecutorService().execute(msg.partition(), c); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6f95400..0548581 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -67,14 +67,17 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -118,11 +121,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage; import org.apache.ignite.internal.processors.igfs.IgfsAckMessage; import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage; @@ -173,6 +176,21 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -47: + msg = new GridNearAtomicCheckUpdateRequest(); + + break; + + case -46: + msg = new UpdateErrors(); + + break; + + case -45: + msg = new GridDhtAtomicNearResponse(); + + break; + case -44: msg = new TcpCommunicationSpi.HandshakeMessage2(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java index c4203ef..a777ab6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java @@ -33,7 +33,8 @@ import org.apache.ignite.internal.util.typedef.F; /** * */ -@SuppressWarnings("TypeMayBeWeakened") public class CacheObjectContext { +@SuppressWarnings("TypeMayBeWeakened") +public class CacheObjectContext { /** */ private GridKernalContext kernalCtx; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 3e11d50..8df229e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -19,16 +19,15 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** * Update future for atomic cache. */ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { /** - * @return Future version. + * @return Future ID. */ - public GridCacheVersion version(); + public Long id(); /** * Gets future that will be completed when it is safe when update is finished on the given version of topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 1f28201..1cd8fbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -47,15 +47,18 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -85,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -213,15 +217,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } if (fut != null && !fut.isDone()) { + Thread curThread = Thread.currentThread(); + + final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1; + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + Runnable c = new Runnable() { @Override public void run() { IgniteLogger log = cacheMsg.messageLogger(cctx); if (log.isDebugEnabled()) { StringBuilder msg0 = new StringBuilder("Process cache message after wait for " + - "affinity topology version ["); + "affinity topology version ["); appendMessageInfo(cacheMsg, nodeId, msg0).append(']'); @@ -230,7 +238,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { handleMessage(nodeId, cacheMsg); } - }); + }; + + if (stripe >= 0) + cctx.kernalContext().getStripedExecutorService().execute(stripe, c); + else + cctx.kernalContext().closure().runLocalSafe(c); } }); @@ -471,15 +484,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param cacheMsg Cache message. * @return Atomic future ID if applicable for message. */ - @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) { + @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) { if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) - return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion(); + return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId(); else if (cacheMsg instanceof GridNearAtomicUpdateResponse) - return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion(); + return ((GridNearAtomicUpdateResponse) cacheMsg).futureId(); else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) - return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion(); + return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId(); else if (cacheMsg instanceof GridDhtAtomicUpdateResponse) - return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion(); + return ((GridDhtAtomicUpdateResponse) cacheMsg).futureId(); + else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest) + return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId(); return null; } @@ -490,9 +505,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @return Atomic future ID if applicable for message. */ @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) { - if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) - return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion(); - else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) + if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion(); return null; @@ -561,12 +574,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( ctx.cacheId(), - req.futureVersion(), + req.partition(), + req.futureId(), ctx.deploymentEnabled()); res.onError(req.classError()); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + + if (req.nearNodeId() != null) { + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), + req.partition(), + req.nearFutureId(), + nodeId, + req.flags()); + + nearRes.errors(new UpdateErrors(req.classError())); + + sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy()); + } } break; @@ -577,7 +603,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion(), + req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -755,7 +783,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion(), + req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -771,7 +801,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion(), + req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -787,7 +819,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, - req.futureVersion(), + req.futureId(), + req.partition(), + false, ctx.deploymentEnabled()); res.error(req.classError()); @@ -802,12 +836,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( ctx.cacheId(), - req.futureVersion(), + req.partition(), + req.futureId(), ctx.deploymentEnabled()); res.onError(req.classError()); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + + if (req.nearNodeId() != null) { + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), + req.partition(), + req.nearFutureId(), + nodeId, + req.flags()); + + nearRes.errors(new UpdateErrors(req.classError())); + + sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy()); + } } break; @@ -894,7 +941,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings("unchecked") public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException { - assert !node.isLocal(); + assert !node.isLocal() : node; if (!onSend(msg, node.id())) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2237e22..54b4ed7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2170,8 +2170,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert conflictCtx != null; - boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; - // Use old value? if (conflictCtx.isUseOld()) { GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; @@ -2180,7 +2178,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (!isNew() && // Not initial value, verCheck && // and atomic version check, oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, - ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal, + ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal, cctx.writeThrough() && // and store is enabled, primary) // and we are primary. { @@ -2226,13 +2224,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme conflictVer = null; } - boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; - // Perform version check only in case there was no explicit conflict resolution. if (conflictCtx == null) { if (verCheck) { - if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) { - if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) { + if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) { + if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) { if (log.isDebugEnabled()) log.debug("Received entry update with same version as current (will update store) " + "[entry=" + this + ", newVer=" + newVer + ']'); @@ -2307,7 +2303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else - assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 : + assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 : "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']'; } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 0646d5a..4de465c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message { private static final long serialVersionUID = 0L; /** Maximum number of cache lookup indexes. */ - public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5; + public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7; /** Cache message index field name. */ public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX"; @@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("ForLoopReplaceableByForEach") - protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col, + public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col, GridCacheContext ctx) throws IgniteCheckedException { if (col == null) return; @@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("ForLoopReplaceableByForEach") - protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col, + public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col, GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException @@ -701,6 +701,17 @@ public abstract class GridCacheMessage implements Message { return reader.afterMessageRead(GridCacheMessage.class); } + /** + * @param str Bulder. + * @param name Flag name. + */ + protected final void appendFlag(StringBuilder str, String name) { + if (str.length() > 0) + str.append('|'); + + str.append(name); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheMessage.class, this, "cacheId", cacheId); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 4ec13fc..dff2c88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.DiscoveryEvent; @@ -105,9 +106,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { @GridToStringExclude private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap(); + /** */ + private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis()); + /** Pending atomic futures. */ - private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = - new ConcurrentHashMap8<>(); + private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>(); /** Pending data streamer futures. */ private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>(); @@ -253,10 +256,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { cacheFut.onNodeLeft(discoEvt.eventNode().id()); if (cacheFut.isCancelled() || cacheFut.isDone()) { - GridCacheVersion futVer = cacheFut.version(); + Long futId = cacheFut.id(); - if (futVer != null) - atomicFuts.remove(futVer, cacheFut); + if (futId != null) + atomicFuts.remove(futId, cacheFut); } } } @@ -423,14 +426,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param futVer Future ID. + * @return ID for atomic cache update future. + */ + public long atomicFutureId() { + return atomicFutId.incrementAndGet(); + } + + /** + * @param futId Future ID. * @param fut Future. * @return {@code False} if future was forcibly completed with error. */ - public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { - IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); + public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) { + IgniteInternalFuture<?> old = atomicFuts.put(futId, fut); - assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; + assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']'; return onFutureAdded(fut); } @@ -443,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Number of pending atomic futures. + */ + public int atomicFuturesCount() { + return atomicFuts.size(); + } + + /** * @return Collection of pending data streamer futures. */ public Collection<DataStreamerFuture> dataStreamerFutures() { @@ -452,19 +469,19 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * Gets future by given future ID. * - * @param futVer Future ID. + * @param futId Future ID. * @return Future. */ - @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) { - return atomicFuts.get(futVer); + @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) { + return atomicFuts.get(futId); } /** - * @param futVer Future ID. + * @param futId Future ID. * @return Removed future. */ - @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) { - return atomicFuts.remove(futVer); + @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) { + return atomicFuts.remove(futId); } /** @@ -481,6 +498,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param topVer Topology version. + * @return Future. */ public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) { final DataStreamerFuture fut = new DataStreamerFuture(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index f7ac812..c7ac31a 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -859,7 +859,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!ctx.clientNode() && !ctx.isDaemon()) addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000)); - } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java index 02c882c..c5d4066 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java @@ -126,12 +126,10 @@ public class GridCacheReturn implements Externalizable, Message { } /** - * Checks if value is not {@code null}. * - * @return {@code True} if value is not {@code null}. */ - public boolean hasValue() { - return v != null; + public boolean emptyResult() { + return !invokeRes && v == null && cacheObj == null && success; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java index 7145dc2..5265ec9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java @@ -41,30 +41,31 @@ public abstract class GridDeferredAckMessageSender { private GridTimeoutProcessor time; /** Closure processor. */ - public GridClosureProcessor closure; + public GridClosureProcessor c; /** * @param time Time. - * @param closure Closure. + * @param c Closure. */ public GridDeferredAckMessageSender(GridTimeoutProcessor time, - GridClosureProcessor closure) { + GridClosureProcessor c) { this.time = time; - this.closure = closure; + this.c = c; } /** - * + * @return Timeout. */ public abstract int getTimeout(); /** - * + * @return Buffer size. */ public abstract int getBufferSize(); /** - * + * @param nodeId Node ID. + * @param vers Versions to send. */ public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers); @@ -151,7 +152,7 @@ public abstract class GridDeferredAckMessageSender { /** {@inheritDoc} */ @Override public void onTimeout() { if (guard.compareAndSet(false, true)) { - closure.runLocalSafe(new Runnable() { + c.runLocalSafe(new Runnable() { @Override public void run() { writeLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 146e554..4f8570c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache; +import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -29,7 +31,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb private static final long serialVersionUID = 0L; /** */ - @GridDirectTransient private int part = -1; /** @@ -42,14 +43,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb /** * @param val Value. * @param valBytes Value bytes. - */ - public KeyCacheObjectImpl(Object val, byte[] valBytes) { - this(val, valBytes, -1); - } - - /** - * @param val Value. - * @param valBytes Value bytes. * @param part Partition. */ public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) { @@ -130,7 +123,57 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 1; + return 2; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 1: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(KeyCacheObjectImpl.class); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 1: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + } + + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 2af822a..6ca15de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -32,6 +32,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -409,6 +410,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Nullable @Override public List<ClusterNode> nodes(int p, + AffinityAssignment affAssignment, + List<ClusterNode> affNodes) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index bdd84b0..605150a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -23,6 +23,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -170,6 +171,14 @@ public interface GridDhtPartitionTopology { /** * @param p Partition ID. + * @param affAssignment Assignments. + * @param affNodes Node assigned for given partition by affinity. + * @return Collection of all nodes responsible for this partition with primary node being first. + */ + @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes); + + /** + * @param p Partition ID. * @return Collection of all nodes who {@code own} this partition. */ public List<ClusterNode> owners(int p); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 49de280..53257d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -828,11 +828,32 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Nullable @Override public List<ClusterNode> nodes(int p, + AffinityAssignment affAssignment, + List<ClusterNode> affNodes) { + return nodes0(p, affAssignment, affNodes); + } + + /** {@inheritDoc} */ @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); List<ClusterNode> affNodes = affAssignment.get(p); + List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes); + + return nodes != null ? nodes : affNodes; + } + + /** + * @param p Partition. + * @param affAssignment Assignments. + * @param affNodes Node assigned for given partition by affinity. + * @return Nodes responsible for given partition (primary is first). + */ + @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) { + AffinityTopologyVersion topVer = affAssignment.topologyVersion(); + lock.readLock().lock(); try { @@ -866,7 +887,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - return nodes != null ? nodes : affNodes; + return nodes; } finally { lock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 4cb113e..5ff5aa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -30,10 +31,13 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -41,14 +45,15 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** * DHT atomic cache backup update future. @@ -74,56 +79,38 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte protected final GridCacheContext cctx; /** Future version. */ - protected final GridCacheVersion futVer; - - /** Completion callback. */ - @GridToStringExclude - private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb; + @GridToStringInclude + protected final long futId; /** Update request. */ - protected final GridNearAtomicAbstractUpdateRequest updateReq; - - /** Update response. */ - final GridNearAtomicUpdateResponse updateRes; + final GridNearAtomicAbstractUpdateRequest updateReq; /** Mappings. */ - @GridToStringInclude + @GridToStringExclude protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings; /** Continuous query closures. */ private Collection<CI1<Boolean>> cntQryClsrs; - /** */ - private final boolean waitForExchange; - /** Response count. */ private volatile int resCnt; /** * @param cctx Cache context. - * @param completionCb Callback to invoke when future is completed. * @param writeVer Write version. * @param updateReq Update request. - * @param updateRes Update response. */ protected GridDhtAtomicAbstractUpdateFuture( GridCacheContext cctx, - CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, - GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes + GridNearAtomicAbstractUpdateRequest updateReq ) { this.cctx = cctx; - this.futVer = cctx.isLocalNode(updateRes.nodeId()) ? - cctx.versions().next(updateReq.topologyVersion()) : // Generate new if request mapped to local. - updateReq.futureVersion(); this.updateReq = updateReq; - this.completionCb = completionCb; - this.updateRes = updateRes; this.writeVer = writeVer; - waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); + futId = cctx.mvcc().atomicFutureId(); if (log == null) { msgLog = cctx.shared().atomicMessageLogger(); @@ -131,8 +118,15 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } } + /** + * @return {@code True} if all updates are sent to DHT. + */ + protected abstract boolean sendAllToDht(); + /** {@inheritDoc} */ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + boolean waitForExchange = !updateReq.topologyLocked(); + if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0) return this; @@ -141,17 +135,23 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** * @param clsr Continuous query closure. + * @param sync Synchronous continuous query flag. */ - public final void addContinuousQueryClosure(CI1<Boolean> clsr) { + public final void addContinuousQueryClosure(CI1<Boolean> clsr, boolean sync) { assert !isDone() : this; - if (cntQryClsrs == null) - cntQryClsrs = new ArrayList<>(10); + if (sync) + clsr.apply(true); + else { + if (cntQryClsrs == null) + cntQryClsrs = new ArrayList<>(10); - cntQryClsrs.add(clsr); + cntQryClsrs.add(clsr); + } } /** + * @param affAssignment Affinity assignment. * @param entry Entry to map. * @param val Value to write. * @param entryProcessor Entry processor. @@ -163,7 +163,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param updateCntr Partition update counter. */ @SuppressWarnings("ForLoopReplaceableByForEach") - final void addWriteEntry(GridDhtCacheEntry entry, + final void addWriteEntry( + AffinityAssignment affAssignment, + GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, @@ -174,7 +176,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte long updateCntr) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); - List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); + List<ClusterNode> affNodes = affAssignment.get(entry.partition()); + + List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes); + + if (dhtNodes == null) + dhtNodes = affNodes; if (log.isDebugEnabled()) log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); @@ -193,8 +200,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte if (updateReq == null) { updateReq = createRequest( - node, - futVer, + node.id(), + futId, writeVer, syncMode, topVer, @@ -212,7 +219,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte conflictExpireTime, conflictVer, addPrevVal, - entry.partition(), prevVal, updateCntr); } @@ -239,7 +245,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ - final void addNearWriteEntries(Collection<UUID> readers, + final void addNearWriteEntries( + Collection<UUID> readers, GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, @@ -262,8 +269,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte continue; updateReq = createRequest( - node, - futVer, + node.id(), + futId, writeVer, syncMode, topVer, @@ -274,8 +281,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte mappings.put(nodeId, updateReq); } - addNearReaderEntry(entry); - updateReq.addNearWriteValue(entry.key(), val, entryProcessor, @@ -284,12 +289,15 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } } - /** - * adds new nearReader. - * - * @param entry GridDhtCacheEntry. - */ - protected abstract void addNearReaderEntry(GridDhtCacheEntry entry); + /** {@inheritDoc} */ + @Override public final IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public final Long id() { + return futId; + } /** * @return Write version. @@ -299,21 +307,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } /** {@inheritDoc} */ - @Override public final IgniteUuid futureId() { - return futVer.asGridUuid(); - } - - /** {@inheritDoc} */ - @Override public final GridCacheVersion version() { - return futVer; - } - - /** {@inheritDoc} */ @Override public final boolean onNodeLeft(UUID nodeId) { boolean res = registerResponse(nodeId); if (res && msgLog.isDebugEnabled()) { - msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer + + msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer + ", node=" + nodeId + ']'); } @@ -324,7 +322,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param nodeId Node ID. * @return {@code True} if request found. */ - final boolean registerResponse(UUID nodeId) { + private boolean registerResponse(UUID nodeId) { int resCnt0; GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null; @@ -353,41 +351,103 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** * Sends requests to remote nodes. + * + * @param nearNode Near node. + * @param ret Cache operation return value. + * @param updateRes Response. + * @param completionCb Callback to invoke to send response to near node. + */ + final void map(ClusterNode nearNode, + GridCacheReturn ret, + GridNearAtomicUpdateResponse updateRes, + GridDhtAtomicCache.UpdateReplyClosure completionCb) { + if (F.isEmpty(mappings)) { + updateRes.dhtNodes(Collections.<UUID>emptyList()); + + completionCb.apply(updateReq, updateRes); + + onDone(); + + return; + } + + boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC || + !ret.emptyResult() || + updateRes.nearVersion() != null || + cctx.localNodeId().equals(nearNode.id()); + + boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht()); + + if (needMapping) { + initMapping(updateRes); + + needReplyToNear = true; + } + + sendDhtRequests(nearNode, ret); + + if (needReplyToNear) + completionCb.apply(updateReq, updateRes); + } + + /** + * @param updateRes Response. */ - final void map() { + private void initMapping(GridNearAtomicUpdateResponse updateRes) { + List<UUID> dhtNodes; + if (!F.isEmpty(mappings)) { - for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) { - try { - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + dhtNodes = new ArrayList<>(mappings.size()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DTH update fut, sent request [futId=" + futVer + - ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); - } + dhtNodes.addAll(mappings.keySet()); + } + else + dhtNodes = Collections.emptyList(); + + updateRes.dhtNodes(dhtNodes); + } + + /** + * @param nearNode Near node. + * @param ret Return value. + */ + private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret) { + for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) { + try { + assert !cctx.localNodeId().equals(req.nodeId()) : req; + + if (updateReq.fullSync()) { + req.nearReplyInfo(nearNode.id(), updateReq.futureId()); + + if (ret.emptyResult()) + req.hasResult(true); } - catch (ClusterTopologyCheckedException ignored) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer + - ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); - } - registerResponse(req.nodeId()); + if (cntQryClsrs != null) + req.replyWithoutDelay(true); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, sent request [futId=" + futId + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); } - catch (IgniteCheckedException ignored) { - U.error(msgLog, "Failed to send request [futId=" + futVer + + } + catch (ClusterTopologyCheckedException ignored) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); - - registerResponse(req.nodeId()); } + + registerResponse(req.nodeId()); } - } - else - onDone(); + catch (IgniteCheckedException ignored) { + U.error(msgLog, "Failed to send request [futId=" + futId + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); - // Send response right away if no ACKs from backup is required. - // Backups will send ACKs anyway, future will be completed after all backups have replied. - if (updateReq.writeSynchronizationMode() != FULL_SYNC) - completionCb.apply(updateReq, updateRes); + registerResponse(req.nodeId()); + } + } } /** @@ -395,7 +455,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * * @param nodeId Backup node ID. */ - public final void onResult(UUID nodeId) { + final void onDeferredResponse(UUID nodeId) { if (log.isDebugEnabled()) log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); @@ -403,8 +463,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } /** - * @param node Node. - * @param futVer Future version. + * @param nodeId Node ID. + * @param res Response. + */ + final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { + if (!F.isEmpty(res.nearEvicted())) { + for (KeyCacheObject key : res.nearEvicted()) { + try { + GridDhtCacheEntry entry = (GridDhtCacheEntry)cctx.cache().peekEx(key); + + if (entry != null) + entry.removeReader(nodeId, res.messageId()); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Entry with evicted reader was removed [key=" + key + ", err=" + e + ']'); + } + } + } + + registerResponse(nodeId); + } + + /** + * @param nodeId Node ID. + * @param futId Future ID. * @param writeVer Update version. * @param syncMode Write synchronization mode. * @param topVer Topology version. @@ -414,8 +497,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @return Request. */ protected abstract GridDhtAtomicAbstractUpdateRequest createRequest( - ClusterNode node, - GridCacheVersion futVer, + UUID nodeId, + long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, @@ -424,38 +507,18 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte @Nullable GridCacheVersion conflictVer ); - /** - * Callback for backup update response. - * - * @param nodeId Backup node ID. - * @param updateRes Update response. - */ - public abstract void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes); - - /** - * @param updateRes Response. - * @param err Error. - */ - protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err); - /** {@inheritDoc} */ @Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { - cctx.mvcc().removeAtomicFuture(version()); + cctx.mvcc().removeAtomicFuture(futId); boolean suc = err == null; - if (!suc) - addFailedKeys(updateRes, err); - if (cntQryClsrs != null) { for (CI1<Boolean> clsr : cntQryClsrs) clsr.apply(suc); } - if (updateReq.writeSynchronizationMode() == FULL_SYNC) - completionCb.apply(updateReq, updateRes); - return true; } @@ -471,4 +534,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte @Override public void markNotTrackable() { // No-op. } + + /** {@inheritDoc} */ + @Override public String toString() { + synchronized (this) { + Map<UUID, String> dhtRes = F.viewReadOnly(mappings, + new IgniteClosure<GridDhtAtomicAbstractUpdateRequest, String>() { + @Override public String apply(GridDhtAtomicAbstractUpdateRequest req) { + return "[res=" + req.hasResponse() + + ", size=" + req.size() + + ", nearSize=" + req.nearSize() + ']'; + } + } + ); + + return S.toString(GridDhtAtomicAbstractUpdateFuture.class, this, "dhtRes", dhtRes); + } + } }
