Fixes: - allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction - fixed 'full_sync' mode for case when tx primary nodes fail - fixed race between statically configured cache start and GridDhtAffinityAssignmentRequest - fixed 'prepareMarshal' methods to marshal only once (ignite-2219)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/457a9ae4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/457a9ae4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/457a9ae4 Branch: refs/heads/ignite-2236 Commit: 457a9ae4d3b0d6eef6e92a15f5ef79c15ccf1f95 Parents: 1d8c4e2 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jan 13 09:21:09 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jan 13 09:29:17 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteTransactions.java | 4 +- .../apache/ignite/internal/IgniteKernal.java | 95 +++- .../cache/CacheEntrySerializablePredicate.java | 3 +- .../cache/CacheInvokeDirectResult.java | 4 +- .../processors/cache/GridCacheIoManager.java | 23 + .../processors/cache/GridCacheProcessor.java | 52 ++- .../processors/cache/GridCacheReturn.java | 2 + .../processors/cache/IgniteCacheProxy.java | 2 +- .../GridDistributedLockResponse.java | 2 +- .../GridDistributedTxFinishRequest.java | 11 +- .../GridDistributedTxPrepareRequest.java | 2 +- .../GridDistributedTxPrepareResponse.java | 4 +- .../dht/GridDhtAffinityAssignmentResponse.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 44 +- .../distributed/dht/GridDhtLockRequest.java | 2 +- .../distributed/dht/GridDhtTxFinishRequest.java | 90 ++-- .../dht/GridDhtTxFinishResponse.java | 4 +- .../cache/distributed/dht/GridDhtTxLocal.java | 1 + .../dht/GridDhtTxPrepareRequest.java | 2 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 21 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 6 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 22 +- .../atomic/GridNearAtomicUpdateResponse.java | 4 +- .../dht/preloader/GridDhtForceKeysResponse.java | 6 +- .../GridDhtPartitionDemandMessage.java | 6 +- .../GridDhtPartitionSupplyMessageV2.java | 6 +- .../preloader/GridDhtPartitionsFullMessage.java | 2 +- .../GridDhtPartitionsSingleMessage.java | 4 +- .../distributed/near/GridNearGetResponse.java | 4 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 6 +- .../near/GridNearSingleGetResponse.java | 2 +- .../near/GridNearTxFinishFuture.java | 432 ++++++++++++++----- .../near/GridNearTxFinishRequest.java | 5 + .../near/GridNearTxFinishResponse.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../near/GridNearTxPrepareResponse.java | 4 +- .../cache/query/GridCacheQueryRequest.java | 16 +- .../cache/query/GridCacheQueryResponse.java | 18 +- .../cache/transactions/IgniteInternalTx.java | 6 + .../cache/transactions/IgniteTxAdapter.java | 23 +- .../cache/transactions/IgniteTxEntry.java | 11 +- .../cache/transactions/IgniteTxHandler.java | 26 +- .../transactions/IgniteTxLocalAdapter.java | 6 +- .../cache/transactions/IgniteTxManager.java | 20 + .../datastreamer/DataStreamerRequest.java | 1 + .../datastructures/DataStructuresProcessor.java | 11 +- .../processors/igfs/IgfsAckMessage.java | 4 +- .../handlers/cache/GridCacheCommandHandler.java | 6 +- .../ignite/spi/discovery/DiscoverySpi.java | 2 + .../ignite/stream/socket/SocketStreamer.java | 3 +- ...cheAbstractFullApiMultithreadedSelfTest.java | 13 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 2 +- .../processors/cache/GridCacheStopSelfTest.java | 2 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 30 +- .../IgniteClientDataStructuresAbstractTest.java | 3 + .../dht/GridCacheTxNodeFailureSelfTest.java | 7 +- .../IgniteCacheCommitDelayTxRecoveryTest.java | 376 ++++++++++++++++ .../IgniteCachePutRetryAbstractSelfTest.java | 36 +- ...gniteCachePutRetryTransactionalSelfTest.java | 21 + .../continuous/GridEventConsumeSelfTest.java | 3 + .../internal/util/nio/GridNioSelfTest.java | 11 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 - ...CommunicationRecoveryAckClosureSelfTest.java | 19 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 + .../junits/common/GridCommonAbstractTest.java | 34 +- .../IgniteCacheTxRecoverySelfTestSuite.java | 3 + .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 69 ++- 67 files changed, 1314 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java index 875b647..dfe6a1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java @@ -18,7 +18,7 @@ package org.apache.ignite; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -54,7 +54,7 @@ import org.apache.ignite.transactions.TransactionMetrics; public interface IgniteTransactions { /** * Starts transaction with default isolation, concurrency, timeout, and invalidation policy. - * All defaults are set in {@link CacheConfiguration} at startup. + * All defaults are set in {@link TransactionConfiguration} at startup. * * @return New transaction * @throws IllegalStateException If transaction is already started by this thread. http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 14b5816..3def718 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1424,8 +1424,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** @throws IgniteCheckedException If registration failed. */ - private void registerExecutorMBeans(ExecutorService execSvc, ExecutorService sysExecSvc, ExecutorService p2pExecSvc, - ExecutorService mgmtExecSvc, ExecutorService restExecSvc) throws IgniteCheckedException { + private void registerExecutorMBeans(ExecutorService execSvc, + ExecutorService sysExecSvc, + ExecutorService p2pExecSvc, + ExecutorService mgmtExecSvc, + ExecutorService restExecSvc) throws IgniteCheckedException { pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor"); sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor"); mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor"); @@ -2414,7 +2417,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { guard(); try { - return ctx.cache().publicJCache(name, false); + return ctx.cache().publicJCache(name, false, true); } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -2431,7 +2434,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { guard(); try { - ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, true, true).get(); + ctx.cache().dynamicStartCache(cacheCfg, + cacheCfg.getName(), + null, + true, + true, + true).get(); return ctx.cache().publicJCache(cacheCfg.getName()); } @@ -2467,8 +2475,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { guard(); try { - if (ctx.cache().cache(cacheCfg.getName()) == null) - ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, false, true).get(); + if (ctx.cache().cache(cacheCfg.getName()) == null) { + ctx.cache().dynamicStartCache(cacheCfg, + cacheCfg.getName(), + null, + false, + true, + true).get(); + } return ctx.cache().publicJCache(cacheCfg.getName()); } @@ -2491,7 +2505,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { guard(); try { - ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, true, true).get(); + ctx.cache().dynamicStartCache(cacheCfg, + cacheCfg.getName(), + nearCfg, + true, + true, + true).get(); return ctx.cache().publicJCache(cacheCfg.getName()); } @@ -2514,11 +2533,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { IgniteInternalCache<Object, Object> cache = ctx.cache().cache(cacheCfg.getName()); - if (cache == null) - ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get(); + if (cache == null) { + ctx.cache().dynamicStartCache(cacheCfg, + cacheCfg.getName(), + nearCfg, + false, + true, + true).get(); + } else { - if (cache.configuration().getNearConfiguration() == null) - ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get(); + if (cache.configuration().getNearConfiguration() == null) { + ctx.cache().dynamicStartCache(cacheCfg, + cacheCfg.getName(), + nearCfg, + false, + true, + true).get(); + } } return ctx.cache().publicJCache(cacheCfg.getName()); @@ -2538,7 +2569,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { guard(); try { - ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true, true).get(); + ctx.cache().dynamicStartCache(null, + cacheName, + nearCfg, + true, + true, + true).get(); IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); @@ -2564,11 +2600,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { IgniteInternalCache<Object, Object> internalCache = ctx.cache().cache(cacheName); - if (internalCache == null) - ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get(); + if (internalCache == null) { + ctx.cache().dynamicStartCache(null, + cacheName, + nearCfg, + false, + true, + true).get(); + } else { - if (internalCache.configuration().getNearConfiguration() == null) - ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get(); + if (internalCache.configuration().getNearConfiguration() == null) { + ctx.cache().dynamicStartCache(null, + cacheName, + nearCfg, + false, + true, + true).get(); + } } IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName); @@ -2587,6 +2635,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param cache Cache. + * @throws IgniteCheckedException If cache without near cache was already started. */ private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCheckedException { if (!cache.context().isNear()) @@ -2596,7 +2645,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** {@inheritDoc} */ @Override public void destroyCache(String cacheName) { - IgniteInternalFuture stopFut = destroyCacheAsync(cacheName); + IgniteInternalFuture stopFut = destroyCacheAsync(cacheName, true); try { stopFut.get(); @@ -2608,13 +2657,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param cacheName Cache name. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Ignite future. */ - public IgniteInternalFuture<?> destroyCacheAsync(String cacheName) { + public IgniteInternalFuture<?> destroyCacheAsync(String cacheName, boolean checkThreadTx) { guard(); try { - return ctx.cache().dynamicDestroyCache(cacheName); + return ctx.cache().dynamicDestroyCache(cacheName, checkThreadTx); } finally { unguard(); @@ -2627,7 +2677,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { if (ctx.cache().cache(cacheName) == null) - ctx.cache().getOrCreateFromTemplate(cacheName).get(); + ctx.cache().getOrCreateFromTemplate(cacheName, true).get(); return ctx.cache().publicJCache(cacheName); } @@ -2641,14 +2691,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param cacheName Cache name. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is deployed. */ - public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName) { + public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName, boolean checkThreadTx) { guard(); try { if (ctx.cache().cache(cacheName) == null) - return ctx.cache().getOrCreateFromTemplate(cacheName); + return ctx.cache().getOrCreateFromTemplate(cacheName, checkThreadTx); return new GridFinishedFuture<>(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java index a243c4e..20cc005 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java @@ -86,7 +86,8 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate { p.prepareMarshal(ctx); - bytes = ctx.marshaller().marshal(p); + if (bytes == null) + bytes = ctx.marshaller().marshal(p); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java index bee1427..fefa422 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -104,7 +104,7 @@ public class CacheInvokeDirectResult implements Message { public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { key.prepareMarshal(ctx.cacheObjectContext()); - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); if (res != null) @@ -119,7 +119,7 @@ public class CacheInvokeDirectResult implements Message { public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { key.finishUnmarshal(ctx.cacheObjectContext(), ldr); - if (errBytes != null) + if (errBytes != null && err == null) err = ctx.marshaller().unmarshal(errBytes, ldr); if (res != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 0aa8b1b..b297827 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; @@ -122,6 +123,28 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { IgniteInternalFuture<?> fut = null; if (cacheMsg.partitionExchangeMessage()) { + if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) { + assert cacheMsg.topologyVersion() != null : cacheMsg; + + AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order()); + + assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 : + "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']'; + + // Need to wait for initial exchange to avoid race between cache start and affinity request. + fut = cctx.exchange().affinityReadyFuture(startTopVer); + + if (fut != null && !fut.isDone()) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + lsnr.onMessage(nodeId, cacheMsg); + } + }); + + return; + } + } + long locTopVer = cctx.discovery().topologyVersion(); long rmtTopVer = cacheMsg.topologyVersion().topologyVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ff02e70..eb6d98e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2030,7 +2030,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { CacheConfiguration cfg = createConfigFromTemplate(cacheName); - return dynamicStartCache(cfg, cacheName, null, true, true); + return dynamicStartCache(cfg, cacheName, null, true, true, true); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -2041,16 +2041,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Dynamically starts cache using template configuration. * * @param cacheName Cache name. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is deployed. */ - public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName) { + public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean checkThreadTx) { try { - if (publicJCache(cacheName, false) != null) // Cache with given name already started. + if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started. return new GridFinishedFuture<>(); CacheConfiguration cfg = createConfigFromTemplate(cacheName); - return dynamicStartCache(cfg, cacheName, null, false, true); + return dynamicStartCache(cfg, cacheName, null, false, true, checkThreadTx); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -2060,6 +2061,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cacheName Cache name. * @return Cache configuration. + * @throws IgniteCheckedException If failed. */ private CacheConfiguration createConfigFromTemplate(String cacheName) throws IgniteCheckedException { CacheConfiguration cfgTemplate = null; @@ -2138,6 +2140,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheName Cache name. * @param nearCfg Near cache configuration. * @param failIfExists Fail if exists flag. + * @param failIfNotStarted If {@code true} fails if cache is not started. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is deployed. */ @SuppressWarnings("IfMayBeConditional") @@ -2146,9 +2150,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { String cacheName, @Nullable NearCacheConfiguration nearCfg, boolean failIfExists, - boolean failIfNotStarted + boolean failIfNotStarted, + boolean checkThreadTx ) { - return dynamicStartCache(ccfg, cacheName, nearCfg, CacheType.USER, failIfExists, failIfNotStarted); + return dynamicStartCache(ccfg, + cacheName, + nearCfg, + CacheType.USER, + failIfExists, + failIfNotStarted, + checkThreadTx); } /** @@ -2157,7 +2168,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param ccfg Cache configuration. * @param cacheName Cache name. * @param nearCfg Near cache configuration. + * @param cacheType Cache type. * @param failIfExists Fail if exists flag. + * @param failIfNotStarted If {@code true} fails if cache is not started. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is deployed. */ @SuppressWarnings("IfMayBeConditional") @@ -2167,9 +2181,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { @Nullable NearCacheConfiguration nearCfg, CacheType cacheType, boolean failIfExists, - boolean failIfNotStarted + boolean failIfNotStarted, + boolean checkThreadTx ) { - checkEmptyTransactions(); + if (checkThreadTx) + checkEmptyTransactions(); DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); @@ -2260,10 +2276,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cacheName Cache name to destroy. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Future that will be completed when cache is destroyed. */ - public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) { - checkEmptyTransactions(); + public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName, boolean checkThreadTx) { + if (checkThreadTx) + checkEmptyTransactions(); DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId()); @@ -2898,7 +2916,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked); if (cache == null) { - dynamicStartCache(null, name, null, false, true).get(); + dynamicStartCache(null, name, null, false, true, true).get(); cache = jCacheProxies.get(masked); } @@ -3001,21 +3019,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException { - return publicJCache(cacheName, true); + return publicJCache(cacheName, true, true); } /** * @param cacheName Cache name. * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started, * otherwise returns {@code null} in this case. - * @param <K> type of keys. - * @param <V> type of values. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted) - throws IgniteCheckedException + @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, + boolean failIfNotStarted, + boolean checkThreadTx) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Getting public cache for name: " + cacheName); @@ -3030,7 +3048,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); if (cache == null) { - dynamicStartCache(null, cacheName, null, false, failIfNotStarted).get(); + dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get(); cache = jCacheProxies.get(masked); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java index 21154c9..a9edb95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java @@ -169,6 +169,7 @@ public class GridCacheReturn implements Externalizable, Message { * @param cctx Cache context. * @param cacheObj Value to set. * @param success Success flag to set. + * @param keepBinary Keep binary flag. * @return This instance for chaining. */ public GridCacheReturn set( @@ -187,6 +188,7 @@ public class GridCacheReturn implements Externalizable, Message { /** * @param cctx Cache context. * @param cacheObj Cache object. + * @param keepBinary Keep binary flag. */ private void initValue(GridCacheContext cctx, @Nullable CacheObject cacheObj, boolean keepBinary) { if (loc) http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 27a7587..b64c69c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1626,7 +1626,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V IgniteInternalFuture<?> fut; try { - fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name()); + fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), true); } finally { onLeave(gate); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index bb3f9ff..f088e1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -194,7 +194,7 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId)); - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index 34b3112..a761fec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.lang.IgniteUuid; @@ -85,6 +83,8 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { * @param invalidate Invalidate flag. * @param sys System transaction flag. * @param plc IO policy. + * @param syncCommit Sync commit flag. + * @param syncRollback Sync rollback flag. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -184,6 +184,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { } /** + * @param syncCommit Sync commit flag. + */ + public void syncCommit(boolean syncCommit) { + this.syncCommit = syncCommit; + } + + /** * @return Sync rollback flag. */ public boolean syncRollback() { http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index e595942..0d26c84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -317,7 +317,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (reads != null) marshalTx(reads, ctx); - if (dhtVers != null) { + if (dhtVers != null && dhtVerKeys == null) { for (IgniteTxKey key : dhtVers.keySet()) { GridCacheContext cctx = ctx.cacheContext(key.cacheId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java index d2c5aa4..4d22213 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -93,7 +93,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); } @@ -101,7 +101,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (errBytes != null) + if (errBytes != null && err == null) err = ctx.marshaller().unmarshal(errBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index e731406..8e041c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -104,7 +104,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (affAssignment != null) + if (affAssignment != null && affAssignmentBytes == null) affAssignmentBytes = ctx.marshaller().marshal(affAssignment); } @@ -113,7 +113,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (affAssignmentBytes != null) { + if (affAssignmentBytes != null && affAssignment == null) { affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr); // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented. http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 98711b8..1c3e052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -494,14 +494,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> boolean found = false; for (IgniteInternalFuture<?> fut : futures()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture f = (MiniFuture)fut; - if (f.node().id().equals(nodeId)) { - f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId)); + if (f.node().id().equals(nodeId)) { + f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId)); - found = true; - } + found = true; } } @@ -551,12 +549,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> synchronized (futs) { // Avoid iterator creation. for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture<Boolean> fut = futs.get(i); - - if (!isMini(fut)) - continue; - - MiniFuture mini = (MiniFuture)fut; + MiniFuture mini = (MiniFuture)futs.get(i); if (mini.futureId().equals(miniId)) { if (!mini.isDone()) @@ -772,14 +765,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> } /** - * @param f Future. - * @return {@code True} if mini-future. - */ - private boolean isMini(IgniteInternalFuture<?> f) { - return f.getClass().equals(MiniFuture.class); - } - - /** * */ public void map() { @@ -1006,7 +991,24 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtLockFuture.class, this, super.toString()); + Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + MiniFuture m = (MiniFuture)f; + + return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; + } + }); + + Collection<KeyCacheObject> locks; + + synchronized (this) { + locks = new HashSet<>(pendingLocks); + } + + return S.toString(GridDhtLockFuture.class, this, + "innerFuts", futs, + "pendingLocks", locks, + "super", super.toString()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 62cf69d..50167d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -311,7 +311,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { prepareMarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId)); - if (owned != null) { + if (owned != null && ownedKeys == null) { ownedKeys = new KeyCacheObject[owned.size()]; ownedValues = new GridCacheVersion[ownedKeys.length]; http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 65f1cb4..2d98e0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; -import java.util.Collections; import java.util.UUID; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -44,6 +43,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** */ private static final long serialVersionUID = 0L; + /** */ + public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01; + /** Near node ID. */ private UUID nearNodeId; @@ -64,7 +66,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { @GridDirectCollection(GridCacheVersion.class) private Collection<GridCacheVersion> pendingVers; - /** Check comitted flag. */ + /** Check committed flag. */ private boolean checkCommitted; /** Partition update counter. */ @@ -81,6 +83,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** Task name hash. */ private int taskNameHash; + /** */ + private byte flags; + /** * Empty constructor required for {@link Externalizable}. */ @@ -100,6 +105,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @param commit Commit flag. * @param invalidate Invalidate flag. * @param sys System flag. + * @param plc IO policy. * @param sysInvalidate System invalidation flag. * @param syncCommit Synchronous commit flag. * @param syncRollback Synchronous rollback flag. @@ -180,6 +186,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @param commit Commit flag. * @param invalidate Invalidate flag. * @param sys System flag. + * @param plc IO policy. * @param sysInvalidate System invalidation flag. * @param syncCommit Synchronous commit flag. * @param syncRollback Synchronous rollback flag. @@ -302,16 +309,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } /** - * Gets versions of not acquired locks with version less then one of transaction being committed. - * - * @return Versions of locks for entries participating in transaction that have not been acquired yet - * have version less then one of transaction being committed. - */ - public Collection<GridCacheVersion> pendingVersions() { - return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; - } - - /** * @return Check committed flag. */ public boolean checkCommitted() { @@ -325,6 +322,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { this.checkCommitted = checkCommitted; } + /** + * @return {@code True} + */ + public boolean waitRemoteTransactions() { + return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0; + } + + /** + * @param waitRemoteTxs Wait remote transactions flag. + */ + public void waitRemoteTransactions(boolean waitRemoteTxs) { + if (waitRemoteTxs) + flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK); + else + flags &= ~WAIT_REMOTE_TX_FLAG_MASK; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); @@ -352,60 +366,66 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); case 19: - if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 20: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); case 21: - if (!writer.writeUuid("nearNodeId", nearNodeId)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 23: - if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) return false; writer.incrementState(); case 24: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 26: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) return false; writer.incrementState(); case 27: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 28: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 29: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -436,6 +456,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 19: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -447,7 +475,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 20: + case 21: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -455,7 +483,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 21: + case 22: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -463,7 +491,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 22: + case 23: partUpdateCnt = reader.readMessage("partUpdateCnt"); if (!reader.isLastRead()) @@ -471,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 23: + case 24: pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -479,7 +507,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 24: + case 25: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -487,7 +515,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 25: + case 26: sysInvalidate = reader.readBoolean("sysInvalidate"); if (!reader.isLastRead()) @@ -495,7 +523,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 26: + case 27: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -503,7 +531,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 27: + case 28: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -511,7 +539,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 28: + case 29: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -531,6 +559,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 29; + return 30; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index fb4d97d..626ad89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -109,7 +109,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (checkCommittedErr != null) + if (checkCommittedErr != null && checkCommittedErrBytes == null) checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr); } @@ -118,7 +118,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (checkCommittedErrBytes != null) + if (checkCommittedErrBytes != null && checkCommittedErr == null) checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index e026b4e..ebf1002 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -421,6 +421,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!state(PREPARING)) { if (state() == PREPARED && isSystemInvalidate()) fut.complete(); + if (setRollbackOnly()) { if (timedOut()) fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 394ff89..d31ecba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -281,7 +281,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (owned != null) { + if (owned != null && ownedKeys == null) { ownedKeys = owned.keySet(); ownedVals = owned.values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7bee5a3..7cc276f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -656,11 +656,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid if (!addDepInfo && ctx.deploymentEnabled()) addDepInfo = true; - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + if (invokeArgsBytes == null) + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + if (entryProcessorsBytes == null) + entryProcessorsBytes = marshalCollection(entryProcessors, cctx); - nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx); + if (nearEntryProcessorsBytes == null) + nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx); } } @@ -681,13 +684,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid finishUnmarshalCacheObjects(prevVals, cctx, ldr); if (forceTransformBackups) { - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + if (entryProcessors == null) + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); - invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); - } + if (invokeArgs == null) + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); - if (forceTransformBackups) - nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); + if (nearEntryProcessors == null) + nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index f1bb323..95fdeb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -165,7 +165,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri prepareMarshalCacheObjects(nearEvicted, cctx); - errBytes = ctx.marshaller().marshal(err); + if (err != null && errBytes == null) + errBytes = ctx.marshaller().marshal(err); } /** {@inheritDoc} */ @@ -178,7 +179,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri finishUnmarshalCacheObjects(nearEvicted, cctx, ldr); - err = ctx.marshaller().unmarshal(errBytes, ldr); + if (errBytes != null && err == null) + err = ctx.marshaller().unmarshal(errBytes, ldr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 7c0aba5..9c4b486 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -184,6 +184,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. + * @param keepBinary Keep binary flag. * @param clientReq Client node request flag. * @param addDepInfo Deployment info flag. */ @@ -593,7 +594,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri filter = null; } - if (expiryPlc != null) + if (expiryPlc != null && expiryPlcBytes == null) expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); if (op == TRANSFORM) { @@ -601,9 +602,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri if (!addDepInfo && ctx.deploymentEnabled()) addDepInfo = true; - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + if (entryProcessorsBytes == null) + entryProcessorsBytes = marshalCollection(entryProcessors, cctx); - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + if (invokeArgsBytes == null) + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); } else prepareMarshalCacheObjects(vals, cctx); @@ -617,8 +620,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri finishUnmarshalCacheObjects(keys, cctx, ldr); - if (op == TRANSFORM) - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + if (op == TRANSFORM) { + if (entryProcessors == null) + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + + if (invokeArgs == null) + invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); + } else finishUnmarshalCacheObjects(vals, cctx, ldr); @@ -629,9 +637,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } } - invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); - - if (expiryPlcBytes != null) + if (expiryPlcBytes != null && expiryPlc == null) expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index b164e7e..3e3ac29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -394,7 +394,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); GridCacheContext cctx = ctx.cacheContext(cacheId); @@ -413,7 +413,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (errBytes != null) + if (errBytes != null && err == null) err = ctx.marshaller().unmarshal(errBytes, ldr); GridCacheContext cctx = ctx.cacheContext(cacheId); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java index 4cdecec..9c5238a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java @@ -170,7 +170,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa info.marshal(cctx); } - errBytes = ctx.marshaller().marshal(err); + if (err != null && errBytes == null) + errBytes = ctx.marshaller().marshal(err); } /** {@inheritDoc} */ @@ -187,7 +188,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa info.unmarshal(cctx, ldr); } - err = ctx.marshaller().unmarshal(errBytes, ldr); + if (errBytes != null && err == null) + err = ctx.marshaller().unmarshal(errBytes, ldr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 53c3d90..5cb84dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -66,6 +66,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** * @param updateSeq Update sequence for this node. * @param topVer Topology version. + * @param cacheId Cache ID. */ GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) { this.cacheId = cacheId; @@ -75,6 +76,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** * @param cp Message to copy from. + * @param parts Partitions. */ GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts) { cacheId = cp.cacheId; @@ -181,7 +183,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (topic != null) + if (topic != null && topicBytes == null) topicBytes = ctx.marshaller().marshal(topic); } @@ -189,7 +191,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (topicBytes != null) + if (topicBytes != null && topic == null) topic = ctx.marshaller().unmarshal(topicBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java index 41454f9..4451cbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -75,9 +75,13 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements /** * @param updateSeq Update sequence for this node. * @param cacheId Cache ID. + * @param topVer Topology version. * @param addDepInfo Deployment info flag. */ - GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) { + GridDhtPartitionSupplyMessageV2(long updateSeq, + int cacheId, + AffinityTopologyVersion topVer, + boolean addDepInfo) { this.cacheId = cacheId; this.updateSeq = updateSeq; this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 0cbdc91..6afb9b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -134,7 +134,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (parts != null && partsBytes == null) partsBytes = ctx.marshaller().marshal(parts); - if (partCntrs != null) + if (partCntrs != null && partCntrsBytes == null) partCntrsBytes = ctx.marshaller().marshal(partCntrs); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index c07a508..1185913 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -138,7 +138,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); - if (partCntrs != null) + if (partCntrsBytes == null && partCntrs != null) partCntrsBytes = ctx.marshaller().marshal(partCntrs); } @@ -149,7 +149,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); - if (partCntrsBytes != null) + if (partCntrsBytes != null && partCntrs == null) partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index 15a791f..6ac91cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -188,7 +188,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe info.marshal(cctx); } - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); } @@ -203,7 +203,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe info.unmarshal(cctx, ldr); } - if (errBytes != null) + if (errBytes != null && err == null) err = ctx.marshaller().unmarshal(errBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index fe6180a..7132567 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -54,9 +54,13 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT AffinityTopologyVersion topVer = null; - if (tx.system()) + if (tx.system()) { topVer = tx.topologyVersionSnapshot(); + if (topVer == null) + topVer = cctx.exchange().readyAffinityVersion(); + } + if (topVer == null) topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java index 42ad7ed..314c35c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java @@ -167,7 +167,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC ((GridCacheEntryInfo)res).marshal(cctx); } - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); }