IGNITE-4827: Remove compatibility logic for 1.x versions. This closes #1654.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/488b25e1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/488b25e1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/488b25e1 Branch: refs/heads/master Commit: 488b25e191d66eb970cba9339c41bc0d88479878 Parents: 12e240a Author: tledkov-gridgain <[email protected]> Authored: Mon Mar 27 15:18:01 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Mar 27 15:18:01 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 5 +- .../ignite/internal/binary/BinaryContext.java | 12 +- .../communication/GridIoMessageFactory.java | 5 +- .../discovery/GridDiscoveryManager.java | 38 +- .../cache/CacheAffinitySharedManager.java | 4 - .../processors/cache/GridCacheAdapter.java | 184 ++----- .../GridCachePartitionExchangeManager.java | 62 +-- .../binary/CacheObjectBinaryProcessorImpl.java | 39 -- .../dht/GridClientPartitionTopology.java | 34 +- .../dht/GridDhtAffinityAssignmentResponse.java | 101 +--- .../dht/GridDhtPartitionTopology.java | 6 +- .../dht/GridDhtPartitionTopologyImpl.java | 34 +- .../dht/GridPartitionedGetFuture.java | 8 +- .../dht/GridPartitionedSingleGetFuture.java | 53 +- .../dht/preloader/GridDhtPartitionDemander.java | 505 ++----------------- .../dht/preloader/GridDhtPartitionFullMap.java | 45 +- .../dht/preloader/GridDhtPartitionMap.java | 210 +++++++- .../dht/preloader/GridDhtPartitionMap2.java | 329 ------------ .../GridDhtPartitionsAbstractMessage.java | 4 - .../GridDhtPartitionsExchangeFuture.java | 2 +- .../preloader/GridDhtPartitionsFullMessage.java | 6 +- .../GridDhtPartitionsSingleMessage.java | 10 +- .../dht/preloader/GridDhtPreloader.java | 18 +- .../CacheContinuousQueryBatchAck.java | 4 - .../continuous/CacheContinuousQueryHandler.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 87 +--- .../store/GridCacheStoreManagerAdapter.java | 25 - .../closure/GridClosureProcessor.java | 398 +-------------- .../continuous/GridContinuousProcessor.java | 3 - .../h2/twostep/messages/GridQueryRequest.java | 368 -------------- .../service/GridServiceProcessor.java | 132 +---- .../ignite/internal/visor/cache/VisorCache.java | 84 ++- .../cache/VisorCacheAggregatedMetrics.java | 2 +- .../visor/cache/VisorCacheConfiguration.java | 20 +- .../internal/visor/cache/VisorCacheMetrics.java | 26 + .../cache/VisorCacheMetricsCollectorTask.java | 18 +- .../visor/cache/VisorCacheMetricsV2.java | 66 --- .../cache/VisorCacheQueryConfiguration.java | 11 + .../cache/VisorCacheQueryConfigurationV2.java | 47 -- .../cache/VisorCacheStoreConfiguration.java | 12 + .../cache/VisorCacheStoreConfigurationV2.java | 48 -- .../internal/visor/cache/VisorCacheV2.java | 73 --- .../internal/visor/cache/VisorCacheV3.java | 52 -- .../internal/visor/cache/VisorCacheV4.java | 124 ----- .../visor/event/VisorGridDiscoveryEvent.java | 18 +- .../visor/event/VisorGridDiscoveryEventV2.java | 80 --- .../visor/node/VisorNodeDataCollectorJob.java | 52 +- .../internal/visor/query/VisorQueryArg.java | 39 +- .../internal/visor/query/VisorQueryArgV2.java | 49 -- .../internal/visor/query/VisorQueryArgV3.java | 51 -- .../internal/visor/query/VisorQueryJob.java | 8 +- .../internal/visor/util/VisorEventMapper.java | 4 +- .../internal/visor/util/VisorTaskUtils.java | 15 - .../communication/tcp/TcpCommunicationSpi.java | 17 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 104 +--- .../spi/discovery/tcp/TcpDiscoverySpi.java | 9 - .../messages/TcpDiscoveryClientAckResponse.java | 4 - .../resources/META-INF/classnames.properties | 106 ++-- ...CacheExchangeMessageDuplicatedStateTest.java | 12 +- .../dht/GridCacheDhtPreloadDelayedSelfTest.java | 12 +- .../dht/GridCacheDhtPreloadSelfTest.java | 4 +- .../GridCacheRebalancingSyncSelfTest.java | 7 +- .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 60 --- .../junits/common/GridCommonAbstractTest.java | 4 +- .../query/h2/twostep/GridMapQueryExecutor.java | 39 -- .../h2/twostep/GridReduceQueryExecutor.java | 62 +-- .../commands/cache/VisorCacheCommand.scala | 19 +- .../commands/cache/VisorCacheScanCommand.scala | 2 +- .../commands/disco/VisorDiscoveryCommand.scala | 2 +- .../cache/WaitMapExchangeFinishCallable.java | 4 +- .../IgniteFailoverAbstractBenchmark.java | 4 +- 71 files changed, 741 insertions(+), 3362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 5416ff0..ce79b4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -34,12 +34,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.marshaller.MappingExchangeResult; @@ -121,7 +120,7 @@ public class MarshallerContextImpl implements MarshallerContext { processResource(jdkClsNames); checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE); - checkHasClassName(GridDhtPartitionMap2.class.getName(), ldr, CLS_NAMES_FILE); + checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE); checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE); if (plugins != null && !plugins.isEmpty()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java index febfb04..5f9e4ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java @@ -189,12 +189,12 @@ public class BinaryContext { sysClss.add(IgfsClientUpdateCallable.class.getName()); // Closure processor classes. - sysClss.add(GridClosureProcessor.C1V2.class.getName()); - sysClss.add(GridClosureProcessor.C1MLAV2.class.getName()); - sysClss.add(GridClosureProcessor.C2V2.class.getName()); - sysClss.add(GridClosureProcessor.C2MLAV2.class.getName()); - sysClss.add(GridClosureProcessor.C4V2.class.getName()); - sysClss.add(GridClosureProcessor.C4MLAV2.class.getName()); + sysClss.add(GridClosureProcessor.C1.class.getName()); + sysClss.add(GridClosureProcessor.C1MLA.class.getName()); + sysClss.add(GridClosureProcessor.C2.class.getName()); + sysClss.add(GridClosureProcessor.C2MLA.class.getName()); + sysClss.add(GridClosureProcessor.C4.class.getName()); + sysClss.add(GridClosureProcessor.C4MLA.class.getName()); sysClss.add(IgniteUuid.class.getName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 1d84ead..7bf3de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -141,7 +141,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; -import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse; import org.apache.ignite.internal.util.GridByteArrayList; @@ -793,8 +792,8 @@ public class GridIoMessageFactory implements MessageFactory { break; case 110: - msg = new GridQueryRequest(); - + // EMPTY type + // GridQueryRequest was removed break; case 111: http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b2c4ced..b261a56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -69,7 +69,6 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; @@ -1111,12 +1110,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']'); } - boolean rmtLateAssign; - - if (n.version().compareToIgnoreTimestamp(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0) - rmtLateAssign = n.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); - else - rmtLateAssign = false; + boolean rmtLateAssign = n.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); if (locDelayAssign != rmtLateAssign) { throw new IgniteCheckedException("Remote node has cache affinity assignment mode different from local " + @@ -1127,26 +1121,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", rmtAddrs=" + U.addressesAsString(n) + ']'); } - if (n.version().compareToIgnoreTimestamp(GridServiceProcessor.LAZY_SERVICES_CFG_SINCE) >= 0) { - Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); - - if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) { - throw new IgniteCheckedException("Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE + - " property value differs from remote node's value " + - "(to make sure all nodes in topology have identical IgniteServices compatibility mode enabled, " + - "configure system property explicitly) " + - "[locSrvcCompatibilityEnabled=" + locSrvcCompatibilityEnabled + - ", rmtSrvcCompatibilityEnabled=" + rmtSrvcCompatibilityEnabled + - ", locNodeAddrs=" + U.addressesAsString(locNode) + - ", rmtNodeAddrs=" + U.addressesAsString(n) + - ", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']'); - } - } - else if (Boolean.FALSE.equals(locSrvcCompatibilityEnabled)) { - throw new IgniteCheckedException("Remote node doesn't support lazy services configuration and " + - "local node cannot join node because local node's " - + IGNITE_SERVICES_COMPATIBILITY_MODE + " property value explicitly set to 'false'" + - "[locNodeAddrs=" + U.addressesAsString(locNode) + + Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); + + if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) { + throw new IgniteCheckedException("Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE + + " property value differs from remote node's value " + + "(to make sure all nodes in topology have identical IgniteServices compatibility mode enabled, " + + "configure system property explicitly) " + + "[locSrvcCompatibilityEnabled=" + locSrvcCompatibilityEnabled + + ", rmtSrvcCompatibilityEnabled=" + rmtSrvcCompatibilityEnabled + + ", locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" + U.addressesAsString(n) + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index d287188..35d68e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -69,9 +68,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; */ @SuppressWarnings("ForLoopReplaceableByForEach") public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { - /** */ - public static final IgniteProductVersion LATE_AFF_ASSIGN_SINCE = IgniteProductVersion.fromString("1.6.0"); - /** Late affinity assignment flag. */ private boolean lateAffAssign; http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3bfd1f8..63c46c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -100,7 +100,6 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -129,7 +128,6 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.IgniteInstanceResource; @@ -165,12 +163,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Maximum number of retries when topology changes. */ public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100); - /** */ - public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7"); - - /** */ - public static final IgniteProductVersion LOAD_CACHE_JOB_V2_SINCE = IgniteProductVersion.fromString("1.5.19"); - /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -3485,7 +3477,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * * @param keys Keys. * @param plc Expiry policy. - * @param keepBinary Keep binary flag. Will be ignored for releases older than {@link #LOAD_CACHE_JOB_V2_SINCE}. + * @param keepBinary Keep binary flag. * @return Operation future. */ private IgniteInternalFuture<?> runLoadKeysCallable(final Set<? extends K> keys, final ExpiryPolicy plc, @@ -3495,27 +3487,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(); - Collection<ClusterNode> oldNodes = ctx.grid().cluster().forDataNodes(name()).forPredicate( - new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0; - } - }).nodes(); - - if (oldNodes.isEmpty()) { return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, keepBinary), + new LoadKeysCallable<>(ctx.name(), keys, update, plc, keepBinary), nodes, true, 0); - } - else { - return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallable<>(ctx.name(), keys, update, plc), - nodes, - true, - 0); - } } /** @@ -3617,27 +3593,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()) - .forPredicate(new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0; - } - }); - - ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()) - .forPredicate(new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0 && - node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0; - } - }); - - ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()) - .forPredicate(new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) >= 0; - } - }); ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); @@ -3645,37 +3600,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; - GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(); - - if (!F.isEmpty(oldNodes.nodes())) { - ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, - Collections.singletonList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), - oldNodes.nodes()); + Collection<ClusterNode> nodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).nodes(); - fut.add(oldNodesFut); - } - - if (!F.isEmpty(newNodes.nodes())) { - ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, - Collections.singletonList( - new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc)), - newNodes.nodes()); + assert !F.isEmpty(nodes) : "There are not datanodes fo cache: " + ctx.name(); - fut.add(newNodesFut); - } - - if (!F.isEmpty(newNodesV2.nodes())) { - final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - - ComputeTaskInternalFuture newNodesV2Fut = ctx.kernalContext().closure().callAsync(BROADCAST, - Collections.singletonList( - new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)), - newNodesV2.nodes()); + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - fut.add(newNodesV2Fut); - } + ComputeTaskInternalFuture fut = ctx.kernalContext().closure().callAsync(BROADCAST, + Collections.singletonList( + new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)), + nodes); - fut.markInitialized(); return fut; } @@ -3786,8 +3721,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Collection<ClusterNode> nodes = grp.forPredicate(new IgnitePredicate<ClusterNode>() { /** {@inheritDoc} */ @Override public boolean apply(ClusterNode clusterNode) { - return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 && - ((modes.primary && aff.primaryByPartition(clusterNode, part, topVer)) || + return ((modes.primary && aff.primaryByPartition(clusterNode, part, topVer)) || (modes.backup && aff.backupByPartition(clusterNode, part, topVer))); } }).nodes(); @@ -3934,6 +3868,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * @param opCtx Cache operation context. * @return JCache Iterator. */ private Iterator<Cache.Entry<K, V>> localIteratorHonorExpirePolicy(final CacheOperationContext opCtx) { @@ -3978,7 +3913,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @param keepBinary + * @param keepBinary Keep binary flag. * @return Distributed ignite cache iterator. * @throws IgniteCheckedException If failed. */ @@ -4339,6 +4274,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param tx Transaction. * @param op Cache operation. + * @param opCtx Cache operation context. * @param <T> Return type. * @return Future. */ @@ -5887,6 +5823,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private ExpiryPolicy plc; + /** */ + private boolean keepBinary; + /** * Required by {@link Externalizable}. */ @@ -5900,30 +5839,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} * otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}. * @param plc Expiry policy. + * @param keepBinary Keep binary flag. */ - LoadKeysCallable(String cacheName, - Collection<? extends K> keys, - boolean update, - ExpiryPolicy plc) { + LoadKeysCallable(final String cacheName, final Collection<? extends K> keys, final boolean update, + final ExpiryPolicy plc, final boolean keepBinary) { this.cacheName = cacheName; this.keys = keys; this.update = update; this.plc = plc; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @Override public Void call() throws Exception { - return call0(false); - } - - /** - * Internal call routine. - * - * @param keepBinary Keep binary flag. - * @return Result (always {@code null}). - * @throws Exception If failed. - */ - protected Void call0(boolean keepBinary) throws Exception { GridCacheAdapter<K, V> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); assert cache != null : cacheName; @@ -5944,7 +5872,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { + @Override public void writeExternal(final ObjectOutput out) throws IOException { U.writeString(out, cacheName); U.writeCollection(out, keys); @@ -5952,10 +5880,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V out.writeBoolean(update); out.writeObject(plc != null ? new IgniteExternalizableExpiryPolicy(plc) : null); + + out.writeBoolean(keepBinary); } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { cacheName = U.readString(in); keys = U.readCollection(in); @@ -5963,56 +5893,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V update = in.readBoolean(); plc = (ExpiryPolicy)in.readObject(); - } - } - - /** - * - */ - static class LoadKeysCallableV2<K, V> extends LoadKeysCallable<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean keepBinary; - - /** - * Required by {@link Externalizable}. - */ - public LoadKeysCallableV2() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param keys Keys. - * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} - * otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}. - * @param plc Expiry policy. - * @param keepBinary Keep binary flag. - */ - LoadKeysCallableV2(final String cacheName, final Collection<? extends K> keys, final boolean update, - final ExpiryPolicy plc, final boolean keepBinary) { - super(cacheName, keys, update, plc); - - this.keepBinary = keepBinary; - } - - /** {@inheritDoc} */ - @Override public Void call() throws Exception { - return call0(keepBinary); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(final ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeBoolean(keepBinary); - } - - /** {@inheritDoc} */ - @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); keepBinary = in.readBoolean(); } @@ -6531,9 +6411,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** */ - private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.30"); - /** Partition */ private final int partition; @@ -6616,9 +6493,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** */ - public static final IgniteProductVersion NEAR_JOB_SINCE = IgniteProductVersion.fromString("1.5.0"); - /** Cache name. */ private final String cacheName; @@ -6652,7 +6526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V for (ClusterNode node : subgrid) { ComputeJob job; - if (near && node.version().compareTo(NEAR_JOB_SINCE) >= 0) { + if (near) { job = keys == null ? new GlobalClearAllNearJob(cacheName, topVer) : new GlobalClearKeySetNearJob<>(cacheName, topVer, keys); } @@ -6795,6 +6669,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Constructor. * @param internalIterator Internal iterator. + * @param keepBinary Keep binary. */ private EntryIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) { this.internalIterator = internalIterator; @@ -6819,7 +6694,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throw new IllegalStateException(); try { - GridCacheAdapter.this.getAndRemove((K)current.wrapLazyValue(keepBinary).getKey()); + getAndRemove((K)current.wrapLazyValue(keepBinary).getKey()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -6840,7 +6715,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Keep binary flag. */ private final boolean keepBinary; - /** Constructor. */ + /** Constructor. + * @param internalSet Internal set. + * @param keepBinary Keep binary flag. + */ private EntrySet(Set<GridCacheMapEntry> internalSet, boolean keepBinary) { this.internalSet = internalSet; this.keepBinary = keepBinary; http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3668910..231dff8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; @@ -853,21 +852,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana lastVer, exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); - boolean useOldApi = false; - - if (nodes != null) { - for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { - useOldApi = true; - compress = false; - - break; - } - else if (!canUsePartitionMapCompression(node)) - compress = false; - } - } - m.compress(compress); Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); @@ -890,13 +874,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (affCache != null) { GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - if (useOldApi) { - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), - locMap.nodeOrder(), - locMap.updateSequence(), - locMap); - } - addFullPartitionsMap(m, dupData, compress, @@ -955,7 +932,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana map.nodeOrder(), map.updateSequence()); - for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet()) + for (Map.Entry<UUID, GridDhtPartitionMap> e : map.entrySet()) map0.put(e.getKey(), e.getValue().emptyCopy()); map = map0; @@ -1007,25 +984,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean clientOnlyExchange, boolean sndCounters) { - boolean compress = canUsePartitionMapCompression(targetNode); - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, clientOnlyExchange, cctx.versions().last(), - compress); + true); Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); - - if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); + GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap(); addPartitionMap(m, dupData, - compress, + true, cacheCtx.cacheId(), locMap, cacheCtx.affinity().affinityCache().similarAffinityKey()); @@ -1039,11 +1011,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (m.partitions() != null && m.partitions().containsKey(top.cacheId())) continue; - GridDhtPartitionMap2 locMap = top.localPartitionMap(); + GridDhtPartitionMap locMap = top.localPartitionMap(); addPartitionMap(m, dupData, - compress, + true, top.cacheId(), locMap, top.similarAffinityKey()); @@ -1067,7 +1039,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData, boolean compress, Integer cacheId, - GridDhtPartitionMap2 map, + GridDhtPartitionMap map, Object affKey) { Integer dupDataCache = null; @@ -1292,7 +1264,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; - for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); @@ -1575,24 +1547,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Target node. - * @return {@code True} if can use compression for partition map messages. - */ - @SuppressWarnings("SimplifiableIfStatement") - private boolean canUsePartitionMapCompression(ClusterNode node) { - IgniteProductVersion ver = node.version(); - - if (ver.compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) >= 0) { - if (ver.minor() == 7 && ver.maintenance() < 4) - return false; - - return true; - } - - return false; - } - - /** * Exchange future thread. All exchanges happen only by one thread and next * exchange will not start until previous one completes. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index c11f71f..656e70a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -39,7 +39,6 @@ import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.binary.BinaryBasicNameMapper; import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -99,7 +98,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.jetbrains.annotations.Nullable; @@ -115,9 +113,6 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements CacheObjectBinaryProcessor { /** */ - public static final IgniteProductVersion BINARY_CFG_CHECK_SINCE = IgniteProductVersion.fromString("1.5.7"); - - /** */ private final CountDownLatch startLatch = new CountDownLatch(1); /** */ @@ -351,37 +346,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm startLatch.countDown(); } - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK) && marsh instanceof BinaryMarshaller) { - BinaryConfiguration bcfg = ctx.config().getBinaryConfiguration(); - - for (ClusterNode rmtNode : ctx.discovery().remoteNodes()) { - if (rmtNode.version().compareTo(BINARY_CFG_CHECK_SINCE) < 0) { - if (bcfg == null || bcfg.getNameMapper() == null) { - throw new IgniteCheckedException("When BinaryMarshaller is used and topology contains old " + - "nodes, then " + BinaryBasicNameMapper.class.getName() + " mapper have to be set " + - "explicitely into binary configuration and 'simpleName' property of the mapper " + - "have to be set to 'true'."); - } - - if (!(bcfg.getNameMapper() instanceof BinaryBasicNameMapper) - || !((BinaryBasicNameMapper)bcfg.getNameMapper()).isSimpleName()) { - U.quietAndWarn(log, "When BinaryMarshaller is used and topology contains old" + - " nodes, it's strongly recommended, to set " + BinaryBasicNameMapper.class.getName() + - " mapper into binary configuration explicitely " + - " and 'simpleName' property of the mapper set to 'true' (fix configuration or set " + - "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)."); - } - - break; - } - } - } - } - /** * @param key Metadata key. * @param newMeta Metadata. @@ -926,9 +890,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm Object rmtBinaryCfg = rmtNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION); - if (rmtNode.version().compareTo(BINARY_CFG_CHECK_SINCE) < 0) - return null; - ClusterNode locNode = ctx.discovery().localNode(); Object locBinaryCfg = locNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION); http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 6ca15de..13a2f59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -158,7 +158,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { * @return Full map string representation. */ @SuppressWarnings( {"ConstantConditions"}) - private String mapString(GridDhtPartitionMap2 map) { + private String mapString(GridDhtPartitionMap map) { return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); } @@ -377,11 +377,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public GridDhtPartitionMap2 localPartitionMap() { + @Override public GridDhtPartitionMap localPartitionMap() { lock.readLock().lock(); try { - return new GridDhtPartitionMap2(cctx.localNodeId(), updateSeq.get(), topVer, + return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(), topVer, Collections.<Integer, GridDhtPartitionState>emptyMap(), true); } finally { @@ -394,7 +394,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { lock.readLock().lock(); try { - GridDhtPartitionMap2 partMap = node2part.get(nodeId); + GridDhtPartitionMap partMap = node2part.get(nodeId); if (partMap != null) { GridDhtPartitionState state = partMap.get(part); @@ -587,8 +587,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { lastExchangeId = exchId; if (node2part != null) { - for (GridDhtPartitionMap2 part : node2part.values()) { - GridDhtPartitionMap2 newPart = partMap.get(part.nodeId()); + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); // If for some nodes current partition has a newer map, // then we keep the newer value. @@ -618,7 +618,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { Map<Integer, Set<UUID>> p2n = new HashMap<>(); - for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { for (Integer p : e.getValue().keySet()) { Set<UUID> ids = p2n.get(p); @@ -650,7 +650,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap2 parts, + GridDhtPartitionMap parts, Map<Integer, Long> cntrMap, boolean checkEvictions) { if (log.isDebugEnabled()) @@ -686,7 +686,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { node2part = new GridDhtPartitionFullMap(); } - GridDhtPartitionMap2 cur = node2part.get(parts.nodeId()); + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); if (cur != null && cur.updateSequence() >= parts.updateSequence()) { if (log.isDebugEnabled()) @@ -797,10 +797,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } - GridDhtPartitionMap2 map = node2part.get(nodeId); + GridDhtPartitionMap map = node2part.get(nodeId); if (map == null) - node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer, + node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, topVer, Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); map.updateSequence(updateSeq, topVer); @@ -838,7 +838,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { part2node = new HashMap<>(part2node); - GridDhtPartitionMap2 parts = node2part.remove(nodeId); + GridDhtPartitionMap parts = node2part.remove(nodeId); if (parts != null) { for (Integer p : parts.keySet()) { @@ -926,7 +926,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { ", locNodeId=" + cctx.localNodeId() + ", igniteInstanceName=" + cctx.igniteInstanceName() + ']'; - for (GridDhtPartitionMap2 map : node2part.values()) { + for (GridDhtPartitionMap map : node2part.values()) { if (map.hasMovingPartitions()) return true; } @@ -956,7 +956,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (nodeId == null) return false; - GridDhtPartitionMap2 parts = node2part.get(nodeId); + GridDhtPartitionMap parts = node2part.get(nodeId); // Set can be null if node has been removed. if (parts != null) { @@ -984,7 +984,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (node2part == null) return; - for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { for (Integer p : e.getValue().keySet()) { Set<UUID> nodeIds = part2node.get(p); @@ -996,7 +996,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { for (UUID nodeId : e.getValue()) { - GridDhtPartitionMap2 map = node2part.get(nodeId); + GridDhtPartitionMap map = node2part.get(nodeId); assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 52dd190..e8094e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -47,14 +47,6 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** Topology version. */ private AffinityTopologyVersion topVer; - /** Affinity assignment. */ - @GridDirectTransient - @GridToStringInclude - private List<List<ClusterNode>> affAssignment; - - /** Affinity assignment bytes. */ - private byte[] affAssignmentBytes; - /** */ @GridDirectTransient private List<List<UUID>> affAssignmentIds; @@ -80,19 +72,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { * @param cacheId Cache ID. * @param topVer Topology version. * @param affAssignment Affinity assignment. - * @param sndNodeIds If {@code true} sends only node IDs instead of nodes. */ public GridDhtAffinityAssignmentResponse(int cacheId, @NotNull AffinityTopologyVersion topVer, - List<List<ClusterNode>> affAssignment, - boolean sndNodeIds) { + List<List<ClusterNode>> affAssignment) { this.cacheId = cacheId; this.topVer = topVer; - if (!sndNodeIds) - this.affAssignment = affAssignment; - else - affAssignmentIds = ids(affAssignment); + affAssignmentIds = ids(affAssignment); } /** {@inheritDoc} */ @@ -112,16 +99,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { * @return Affinity assignment. */ public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) { - if (affAssignment != null) - return affAssignment; - if (affAssignmentIds != null) - affAssignment = nodes(disco, affAssignmentIds); + return nodes(disco, affAssignmentIds); - return affAssignment; + return null; } /** + * @param disco Discovery manager. * @return Ideal affinity assignment. */ public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) { @@ -167,6 +152,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** * @param assignments Assignment. + * @return Assignment where cluster nodes are converted to their ids. */ private List<List<UUID>> ids(List<List<ClusterNode>> assignments) { if (assignments != null) { @@ -195,7 +181,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 6; } /** @@ -204,13 +190,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - assert affAssignment != null ^ affAssignmentIds != null; - - if (affAssignment != null && affAssignmentBytes == null) - affAssignmentBytes = U.marshal(ctx, affAssignment); + assert affAssignmentIds != null; - if (affAssignmentIds != null && affAssignmentIdsBytes == null) - affAssignmentIdsBytes = U.marshal(ctx, affAssignmentIds); + affAssignmentIdsBytes = U.marshal(ctx, affAssignmentIds); if (idealAffAssignment != null && idealAffAssignmentBytes == null) idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment); @@ -220,55 +202,16 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - assert affAssignmentBytes != null ^ affAssignmentIdsBytes != null; + assert affAssignmentIdsBytes != null; ldr = U.resolveClassLoader(ldr, ctx.gridConfig()); - if (affAssignmentBytes != null && affAssignment == null) - affAssignment = unmarshalNodes(affAssignmentBytes, ctx, ldr); - - if (affAssignmentIdsBytes != null && affAssignmentIds == null) - affAssignmentIds = U.unmarshal(ctx, affAssignmentIdsBytes, ldr); + affAssignmentIds = U.unmarshal(ctx, affAssignmentIdsBytes, ldr); if (idealAffAssignmentBytes != null && idealAffAssignment == null) idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr); } - /** - * @param bytes Assignment bytes. - * @param ctx Context. - * @param ldr Class loader. - * @return Assignment. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - private List<List<ClusterNode>> unmarshalNodes(byte[] bytes, - GridCacheSharedContext ctx, - ClassLoader ldr) - throws IgniteCheckedException - { - List<List<ClusterNode>> affAssignment = U.unmarshal(ctx, bytes, - U.resolveClassLoader(ldr, ctx.gridConfig())); - - // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented. - int assignments = affAssignment.size(); - - for (int n = 0; n < assignments; n++) { - List<ClusterNode> nodes = affAssignment.get(n); - - int size = nodes.size(); - - for (int i = 0; i < size; i++) { - ClusterNode node = nodes.get(i); - - if (node instanceof TcpDiscoveryNode) - ((TcpDiscoveryNode)node).local(node.id().equals(ctx.localNodeId())); - } - } - - return affAssignment; - } - /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return false; @@ -290,24 +233,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { switch (writer.state()) { case 3: - if (!writer.writeByteArray("affAssignmentBytes", affAssignmentBytes)) - return false; - - writer.incrementState(); - - case 4: if (!writer.writeByteArray("affAssignmentIdsBytes", affAssignmentIdsBytes)) return false; writer.incrementState(); - case 5: + case 4: if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes)) return false; writer.incrementState(); - case 6: + case 5: if (!writer.writeMessage("topVer", topVer)) return false; @@ -330,14 +267,6 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { switch (reader.state()) { case 3: - affAssignmentBytes = reader.readByteArray("affAssignmentBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: affAssignmentIdsBytes = reader.readByteArray("affAssignmentIdsBytes"); if (!reader.isLastRead()) @@ -345,7 +274,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { reader.incrementState(); - case 5: + case 4: idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes"); if (!reader.isLastRead()) @@ -353,7 +282,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { reader.incrementState(); - case 6: + case 5: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 605150a..aec3d7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.jetbrains.annotations.Nullable; @@ -148,7 +148,7 @@ public interface GridDhtPartitionTopology { /** * @return Local IDs. */ - public GridDhtPartitionMap2 localPartitionMap(); + public GridDhtPartitionMap localPartitionMap(); /** * @param nodeId Node ID. @@ -230,7 +230,7 @@ public interface GridDhtPartitionTopology { * @return {@code True} if topology state changed. */ @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap2 parts, + GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap, boolean checkEvictions); http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index c476886..7a98366 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -43,7 +43,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; @@ -176,7 +176,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Full map string representation. */ @SuppressWarnings({"ConstantConditions"}) - private String mapString(GridDhtPartitionMap2 map) { + private String mapString(GridDhtPartitionMap map) { return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); } @@ -781,7 +781,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public GridDhtPartitionMap2 localPartitionMap() { + @Override public GridDhtPartitionMap localPartitionMap() { Map<Integer, GridDhtPartitionState> map = new HashMap<>(); lock.readLock().lock(); @@ -796,7 +796,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { map.put(i, part.state()); } - return new GridDhtPartitionMap2(cctx.nodeId(), + return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(), topVer, Collections.unmodifiableMap(map), @@ -812,7 +812,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.readLock().lock(); try { - GridDhtPartitionMap2 partMap = node2part.get(nodeId); + GridDhtPartitionMap partMap = node2part.get(nodeId); if (partMap != null) { GridDhtPartitionState state = partMap.get(part); @@ -1059,8 +1059,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lastExchangeId = exchId; if (node2part != null) { - for (GridDhtPartitionMap2 part : node2part.values()) { - GridDhtPartitionMap2 newPart = partMap.get(part.nodeId()); + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); // If for some nodes current partition has a newer map, // then we keep the newer value. @@ -1095,7 +1095,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { Map<Integer, Set<UUID>> p2n = U.newHashMap(cctx.affinity().partitions()); - for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { for (Integer p : e.getValue().keySet()) { Set<UUID> ids = p2n.get(p); @@ -1136,7 +1136,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap2 parts, + GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap, boolean checkEvictions) { if (log.isDebugEnabled()) @@ -1187,7 +1187,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // Create invalid partition map. node2part = new GridDhtPartitionFullMap(); - GridDhtPartitionMap2 cur = node2part.get(parts.nodeId()); + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); if (cur != null && cur.updateSequence() >= parts.updateSequence()) { if (log.isDebugEnabled()) @@ -1400,10 +1400,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { UUID locNodeId = cctx.localNodeId(); - GridDhtPartitionMap2 map = node2part.get(locNodeId); + GridDhtPartitionMap map = node2part.get(locNodeId); if (map == null) { - map = new GridDhtPartitionMap2(locNodeId, + map = new GridDhtPartitionMap(locNodeId, updateSeq, topVer, Collections.<Integer, GridDhtPartitionState>emptyMap(), @@ -1448,7 +1448,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { else node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); - GridDhtPartitionMap2 parts = node2part.remove(nodeId); + GridDhtPartitionMap parts = node2part.remove(nodeId); if (parts != null) { for (Integer p : parts.keySet()) { @@ -1574,7 +1574,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.igniteInstanceName() + ']'; - for (GridDhtPartitionMap2 map : node2part.values()) { + for (GridDhtPartitionMap map : node2part.values()) { if (map.hasMovingPartitions()) return true; } @@ -1660,7 +1660,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (nodeId == null) return false; - GridDhtPartitionMap2 parts = node2part.get(nodeId); + GridDhtPartitionMap parts = node2part.get(nodeId); // Set can be null if node has been removed. if (parts != null) { @@ -1688,7 +1688,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (node2part == null) return; - for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { for (Integer p : e.getValue().keySet()) { Set<UUID> nodeIds = part2node.get(p); @@ -1700,7 +1700,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { for (UUID nodeId : e.getValue()) { - GridDhtPartitionMap2 map = node2part.get(nodeId); + GridDhtPartitionMap map = node2part.get(nodeId); assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 519239a..f555b84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -53,15 +53,12 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.SINGLE_GET_MSG_SINCE; - /** * Colocated get future. */ @@ -72,9 +69,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - /** Dummy version sent to older nodes for backward compatibility, */ - private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0); - /** Logger. */ private static IgniteLogger log; @@ -335,7 +329,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda cctx.cacheId(), futId, fut.futureId(), - n.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0 ? null : DUMMY_VER, + null, mappedKeys, readThrough, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index a3f6b72..47f4066 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; @@ -53,7 +52,6 @@ import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -68,9 +66,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im /** */ private static final long serialVersionUID = 0L; - /** */ - public static final IgniteProductVersion SINGLE_GET_MSG_SINCE = IgniteProductVersion.fromString("1.5.0"); - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@ -270,41 +265,19 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im cctx.mvcc().addFuture(this, futId); } - GridCacheMessage req; - - if (node.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0) { - req = new GridNearSingleGetRequest(cctx.cacheId(), - futId.localId(), - key, - readThrough, - topVer, - subjId, - taskName == null ? 0 : taskName.hashCode(), - expiryPlc != null ? expiryPlc.forCreate() : -1L, - expiryPlc != null ? expiryPlc.forAccess() : -1L, - skipVals, - /**add reader*/false, - needVer, - cctx.deploymentEnabled()); - } - else { - Map<KeyCacheObject, Boolean> map = Collections.singletonMap(key, false); - - req = new GridNearGetRequest( - cctx.cacheId(), - futId, - futId, - cctx.versions().next(), - map, - readThrough, - topVer, - subjId, - taskName == null ? 0 : taskName.hashCode(), - expiryPlc != null ? expiryPlc.forCreate() : -1L, - expiryPlc != null ? expiryPlc.forAccess() : -1L, - skipVals, - cctx.deploymentEnabled()); - } + GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(), + futId.localId(), + key, + readThrough, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, + expiryPlc != null ? expiryPlc.forAccess() : -1L, + skipVals, + /**add reader*/false, + needVer, + cctx.deploymentEnabled()); try { cctx.io().send(node, req, cctx.ioPolicy());
