http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index 579796d..d2dc817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { +public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable { /** Skip store flag bit mask. */ private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 47572fd..443b1b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -69,7 +69,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -192,19 +193,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { - return new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtAtomicCacheEntry(ctx, topVer, key); - } - }; - } - - /** {@inheritDoc} */ @Override protected void init() { super.init(); @@ -238,11 +226,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { metrics = m; - preldr = new GridDhtPreloader(ctx); - - preldr.start(); - - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @@ -256,7 +240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @@ -270,7 +254,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearAtomicAbstractUpdateRequest.class, new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { @@ -289,7 +273,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @Override public void apply( @@ -307,7 +292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicAbstractUpdateRequest.class, new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { @@ -326,7 +311,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @@ -345,7 +330,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class, new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { @Override public void apply( @@ -363,7 +349,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { @@ -376,7 +363,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridNearAtomicCheckUpdateRequest.class, new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) { @@ -389,8 +377,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addCacheHandler( + ctx.cacheId(), + GridDhtForceKeysRequest.class, + new MessageHandler<GridDhtForceKeysRequest>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { + processForceKeysRequest(node, msg); + } + }); + + ctx.io().addCacheHandler( + ctx.cacheId(), + GridDhtForceKeysResponse.class, + new MessageHandler<GridDhtForceKeysResponse>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { + processForceKeyResponse(node, msg); + } + }); + if (near == null) { - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @@ -404,7 +410,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { @@ -1485,7 +1491,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Optimistically expect that all keys are available locally (avoid creation of get future). for (KeyCacheObject key : keys) { if (readNoEntry) { - CacheDataRow row = ctx.offheap().read(key); + CacheDataRow row = ctx.offheap().read(ctx, key); if (row != null) { long expireTime = row.expireTime(); @@ -1661,7 +1667,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicAbstractUpdateRequest req, final UpdateReplyClosure completionCb ) { - IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion()); + IgniteInternalFuture<Object> forceFut = ctx.group().preloader().request(ctx, req, req.topologyVersion()); if (forceFut == null || forceFut.isDone()) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java deleted file mode 100644 index b0c9a64..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * DHT atomic cache entry. - */ -public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry { - /** - * @param ctx Cache context. - * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). - * @param key Cache key. - */ - GridDhtAtomicCacheEntry( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - super(ctx, topVer, key); - } - - /** {@inheritDoc} */ - @Override protected String cacheName() { - return CU.isNearEnabled(cctx) ? super.cacheName() : cctx.dht().name(); - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(GridDhtAtomicCacheEntry.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index 92ef149..0c069da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridLongList; @@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable; /** * Deferred dht atomic update response. */ -public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtAtomicDeferredUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java index d6e2db0..71d2321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.nio.ByteBuffer; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -36,7 +36,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic /** * Message sent from DHT nodes to near node in FULL_SYNC mode. */ -public class GridDhtAtomicNearResponse extends GridCacheMessage { +public class GridDhtAtomicNearResponse extends GridCacheIdMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 693d658..7b2547a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCheckedException; @@ -27,7 +26,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -39,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * DHT atomic cache backup update response. */ -public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 4b3ea5bc..bb47af4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { +public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable { /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java index 4b9109e..96be023 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.nio.ByteBuffer; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage { +public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 55953ea..5ba024f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -45,7 +45,7 @@ import org.jetbrains.annotations.Nullable; /** * DHT atomic cache near update response. */ -public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 12a3912..708df49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -116,35 +115,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { - return new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtColocatedCacheEntry(ctx, topVer, key); - } - }; - } - - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processNearGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { processNearSingleGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } @@ -467,7 +453,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte for (KeyCacheObject key : keys) { if (readNoEntry) { - CacheDataRow row = ctx.offheap().read(key); + CacheDataRow row = ctx.offheap().read(ctx, key); if (row != null) { long expireTime = row.expireTime(); @@ -941,7 +927,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer); // Prevent embedded future creation if possible. if (keyFut == null || keyFut.isDone()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java deleted file mode 100644 index f7cc5a7..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; - -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Cache entry for colocated cache. - */ -public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry { - /** - * @param ctx Cache context. - * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). - * @param key Cache key. - */ - GridDhtColocatedCacheEntry( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - super(ctx, topVer, key); - } - - /** {@inheritDoc} */ - @Override protected String cacheName() { - return cctx.colocated().name(); - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(GridDhtColocatedCacheEntry.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 562a165..a5a1eb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -102,9 +102,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec /** Future ID. */ private IgniteUuid futId = IgniteUuid.randomUuid(); - /** Preloader. */ - private GridDhtPreloader preloader; - /** Trackable flag. */ private boolean trackable; @@ -112,21 +109,19 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param cctx Cache context. * @param topVer Topology version. * @param keys Keys. - * @param preloader Preloader. */ public GridDhtForceKeysFuture( GridCacheContext<K, V> cctx, AffinityTopologyVersion topVer, - Collection<KeyCacheObject> keys, - GridDhtPreloader preloader + Collection<KeyCacheObject> keys ) { assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; + assert !cctx.isNear(); this.cctx = cctx; this.keys = keys; this.topVer = topVer; - this.preloader = preloader; top = cctx.dht().topology(); @@ -158,7 +153,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec @Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable err) { if (super.onDone(res, err)) { if (trackable) - preloader.remoteFuture(this); + cctx.dht().removeFuture(this); return true; } @@ -170,7 +165,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param evt Discovery event. */ @SuppressWarnings( {"unchecked"}) - void onDiscoveryEvent(DiscoveryEvent evt) { + public void onDiscoveryEvent(DiscoveryEvent evt) { topCntr.incrementAndGet(); int type = evt.type(); @@ -244,7 +239,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec int curTopVer = topCntr.get(); - if (!preloader.addFuture(this)) { + if (!cctx.dht().addFuture(this)) { assert isDone() : this; return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java index d129ae8..124ae44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -40,7 +40,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; * Force keys request. This message is sent by node while preloading to force * another node to put given keys into the next batch of transmitting entries. */ -public class GridDhtForceKeysRequest extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtForceKeysRequest extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java index c4c57a7..977e9ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -43,7 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Force keys response. Contains absent keys. */ -public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtForceKeysResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; @@ -168,7 +168,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa if (infos != null) { for (GridCacheEntryInfo info : infos) - info.marshal(cctx); + info.marshal(cctx.cacheObjectContext()); } if (err != null && errBytes == null) @@ -186,7 +186,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa if (infos != null) { for (GridCacheEntryInfo info : infos) - info.unmarshal(cctx, ldr); + info.unmarshal(cctx.cacheObjectContext(), ldr); } if (errBytes != null && err == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 24b1de1..04a7e97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -39,7 +39,7 @@ import org.jetbrains.annotations.NotNull; /** * Partition demand request. */ -public class GridDhtPartitionDemandMessage extends GridCacheMessage { +public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { /** */ private static final long serialVersionUID = 0L; @@ -73,10 +73,10 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** * @param updateSeq Update sequence for this node. * @param topVer Topology version. - * @param cacheId Cache ID. + * @param grpId Cache group ID. */ - GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) { - this.cacheId = cacheId; + GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int grpId) { + this.grpId = grpId; this.updateSeq = updateSeq; this.topVer = topVer; } @@ -86,7 +86,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { * @param parts Partitions. */ GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, Map<Integer, Long> partsCntrs) { - cacheId = cp.cacheId; + grpId = cp.grpId; updateSeq = cp.updateSeq; topic = cp.topic; timeout = cp.timeout; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index cdbae1a..a1b45df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -38,13 +37,16 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -78,7 +80,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @SuppressWarnings("NonConstantFieldWithUpperCaseName") public class GridDhtPartitionDemander { /** */ - private final GridCacheContext<?, ?> cctx; + private final GridCacheSharedContext<?, ?> ctx; + + /** */ + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -104,30 +109,20 @@ public class GridDhtPartitionDemander { private final Map<Integer, Object> rebalanceTopics; /** - * Started event sent. - * Make sense for replicated cache only. + * @param grp Ccahe group. */ - private final AtomicBoolean startedEvtSent = new AtomicBoolean(); + public GridDhtPartitionDemander(CacheGroupContext grp) { + assert grp != null; - /** - * Stopped event sent. - * Make sense for replicated cache only. - */ - private final AtomicBoolean stoppedEvtSent = new AtomicBoolean(); + this.grp = grp; - /** - * @param cctx Cctx. - */ - public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) { - assert cctx != null; + ctx = grp.shared(); - this.cctx = cctx; + log = ctx.logger(getClass()); - log = cctx.logger(getClass()); + boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode(); - boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - - rebalanceFut = new RebalanceFuture();//Dummy. + rebalanceFut = new RebalanceFuture(); //Dummy. if (!enabled) { // Calling onDone() immediately since preloading is disabled. @@ -137,7 +132,7 @@ public class GridDhtPartitionDemander { Map<Integer, Object> tops = new HashMap<>(); - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) + for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++) tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx)); rebalanceTopics = tops; @@ -196,7 +191,7 @@ public class GridDhtPartitionDemander { GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); if (obj != null) - cctx.time().removeTimeoutObject(obj); + ctx.time().removeTimeoutObject(obj); final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; @@ -208,7 +203,7 @@ public class GridDhtPartitionDemander { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut); + IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut); fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> future) { @@ -237,7 +232,7 @@ public class GridDhtPartitionDemander { */ private boolean topologyChanged(RebalanceFuture fut) { return - !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. + !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed. fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. } @@ -268,12 +263,12 @@ public class GridDhtPartitionDemander { assert force == (forcedRebFut != null); - long delay = cctx.config().getRebalanceDelay(); + long delay = grp.config().getRebalanceDelay(); if (delay == 0 || force) { final RebalanceFuture oldFut = rebalanceFut; - final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt); + final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt); if (!oldFut.isInitial()) oldFut.cancel(); @@ -319,7 +314,7 @@ public class GridDhtPartitionDemander { fut.onDone(true); - ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + ((GridFutureAdapter)grp.preloader().syncFuture()).onDone(); fut.sendRebalanceFinishedEvent(); @@ -350,7 +345,7 @@ public class GridDhtPartitionDemander { GridTimeoutObject obj = lastTimeoutObj.get(); if (obj != null) - cctx.time().removeTimeoutObject(obj); + ctx.time().removeTimeoutObject(obj); final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; @@ -360,7 +355,7 @@ public class GridDhtPartitionDemander { @Override public void onTimeout() { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { - cctx.shared().exchange().forceRebalance(exchFut); + ctx.exchange().forceRebalance(exchFut); } }); } @@ -368,7 +363,7 @@ public class GridDhtPartitionDemander { lastTimeoutObj.set(obj); - cctx.time().addTimeoutObject(obj); + ctx.time().addTimeoutObject(obj); } return null; @@ -399,17 +394,19 @@ public class GridDhtPartitionDemander { Collection<Integer> parts= e.getValue().partitions(); - assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]"; + assert parts != null : "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]"; fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts)); } } + final CacheConfiguration cfg = grp.config(); + + int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize(); + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { final ClusterNode node = e.getKey(); - final CacheConfiguration cfg = cctx.config(); - final Collection<Integer> parts = fut.remaining.get(node.id()).get2(); GridDhtPartitionDemandMessage d = e.getValue(); @@ -418,8 +415,6 @@ public class GridDhtPartitionDemander { ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); for (int cnt = 0; cnt < lsnrCnt; cnt++) @@ -439,16 +434,16 @@ public class GridDhtPartitionDemander { initD.topic(rebalanceTopics.get(cnt)); initD.updateSequence(fut.updateSeq); - initD.timeout(cctx.config().getRebalanceTimeout()); + initD.timeout(cfg.getRebalanceTimeout()); final int finalCnt = cnt; - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + ctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { try { if (!fut.isDone()) { - cctx.io().sendOrderedMessage(node, - rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout()); + ctx.io().sendOrderedMessage(node, + rebalanceTopics.get(finalCnt), initD, grp.ioPolicy(), initD.timeout()); // Cleanup required in case partitions demanded in parallel with cancellation. synchronized (fut) { @@ -495,11 +490,11 @@ public class GridDhtPartitionDemander { for (Integer part : parts) { try { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { if (partCntrs == null) partCntrs = new HashMap<>(parts.size(), 1.0f); - GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false); + GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false); partCntrs.put(part, p.initialUpdateCounter()); } @@ -575,7 +570,7 @@ public class GridDhtPartitionDemander { final RebalanceFuture fut = rebalanceFut; - ClusterNode node = cctx.node(id); + ClusterNode node = ctx.node(id); if (node == null) return; @@ -599,14 +594,16 @@ public class GridDhtPartitionDemander { return; } - final GridDhtPartitionTopology top = cctx.dht().topology(); + final GridDhtPartitionTopology top = grp.topology(); try { + AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + // Preload. for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (aff.get(p).contains(ctx.localNode())) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -617,7 +614,7 @@ public class GridDhtPartitionDemander { boolean reserved = part.reserve(); assert reserved : "Failed to reserve partition [igniteInstanceName=" + - cctx.igniteInstanceName() + ", cacheName=" + cctx.name() + ", part=" + part + ']'; + ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'; part.lock(); @@ -676,7 +673,7 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) { - if (cctx.affinity().partitionLocalNode(miss, topVer)) + if (aff.get(miss).contains(ctx.localNode())) fut.partitionMissed(id, miss); } @@ -684,16 +681,18 @@ public class GridDhtPartitionDemander { fut.partitionDone(id, miss); GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - supply.updateSequence(), supply.topologyVersion(), cctx.cacheId()); + supply.updateSequence(), + supply.topologyVersion(), + grp.groupId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(grp.config().getRebalanceTimeout()); d.topic(rebalanceTopics.get(idx)); if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. - cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), + d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException e) { @@ -722,13 +721,15 @@ public class GridDhtPartitionDemander { GridCacheEntryInfo entry, AffinityTopologyVersion topVer ) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { GridCacheEntryEx cached = null; try { - cached = cctx.dht().entryEx(entry.key()); + GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext(); + + cached = cctx.dhtCache().entryEx(entry.key()); if (log.isDebugEnabled()) log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); @@ -779,10 +780,10 @@ public class GridDhtPartitionDemander { } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } return true; @@ -798,16 +799,10 @@ public class GridDhtPartitionDemander { */ public static class RebalanceFuture extends GridFutureAdapter<Boolean> { /** */ - private static final long serialVersionUID = 1L; - - /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */ - private final AtomicBoolean startedEvtSent; - - /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */ - private final AtomicBoolean stoppedEvtSent; + private final GridCacheSharedContext<?, ?> ctx; /** */ - private final GridCacheContext<?, ?> cctx; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -829,40 +824,37 @@ public class GridDhtPartitionDemander { private final long updateSeq; /** + * @param grp Cache group. * @param assigns Assigns. - * @param cctx Context. * @param log Logger. - * @param startedEvtSent Start event sent flag. - * @param stoppedEvtSent Stop event sent flag. * @param updateSeq Update sequence. */ - RebalanceFuture(GridDhtPreloaderAssignments assigns, - GridCacheContext<?, ?> cctx, + RebalanceFuture( + CacheGroupContext grp, + GridDhtPreloaderAssignments assigns, IgniteLogger log, - AtomicBoolean startedEvtSent, - AtomicBoolean stoppedEvtSent, long updateSeq) { assert assigns != null; - this.exchFut = assigns.exchangeFuture(); - this.topVer = assigns.topologyVersion(); - this.cctx = cctx; + exchFut = assigns.exchangeFuture(); + topVer = assigns.topologyVersion(); + + this.grp = grp; this.log = log; - this.startedEvtSent = startedEvtSent; - this.stoppedEvtSent = stoppedEvtSent; this.updateSeq = updateSeq; + + ctx= grp.shared(); } /** * Dummy future. Will be done by real one. */ - public RebalanceFuture() { + RebalanceFuture() { this.exchFut = null; this.topVer = null; - this.cctx = null; + this.ctx = null; + this.grp = null; this.log = null; - this.startedEvtSent = null; - this.stoppedEvtSent = null; this.updateSeq = -1; } @@ -900,7 +892,7 @@ public class GridDhtPartitionDemander { U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']'); - if (!cctx.kernalContext().isStopping()) { + if (!ctx.kernalContext().isStopping()) { for (UUID nodeId : remaining.keySet()) cleanupRemoteContexts(nodeId); } @@ -921,7 +913,7 @@ public class GridDhtPartitionDemander { if (isDone()) return; - U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() + + U.log(log, ("Cancelled rebalancing [cache=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); @@ -956,22 +948,24 @@ public class GridDhtPartitionDemander { * @param nodeId Node id. */ private void cleanupRemoteContexts(UUID nodeId) { - ClusterNode node = cctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); if (node == null) return; GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); + -1/* remove supply context signal */, + this.topologyVersion(), + grp.groupId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(grp.config().getRebalanceTimeout()); try { - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException ignored) { @@ -989,20 +983,19 @@ public class GridDhtPartitionDemander { if (isDone()) return; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - exchFut.discoveryEvent()); + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent()); T2<Long, Collection<Integer>> t = remaining.get(nodeId); - assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId + + assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId + ", part=" + p + "]"; Collection<Integer> parts = t.get2(); boolean rmvd = parts.remove(p); - assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId + + assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId + ", part=" + p + ", left=" + parts + "]"; if (parts.isEmpty()) { @@ -1022,18 +1015,18 @@ public class GridDhtPartitionDemander { * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { + private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) { assert discoEvt != null; - cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } /** * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int type, DiscoveryEvent discoEvt) { - preloadEvent(-1, type, discoEvt); + private void rebalanceEvent(int type, DiscoveryEvent discoEvt) { + rebalanceEvent(-1, type, discoEvt); } /** @@ -1053,7 +1046,7 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Completed rebalance future: " + this); - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); Collection<Integer> m = new HashSet<>(); @@ -1067,13 +1060,13 @@ public class GridDhtPartitionDemander { onDone(false); //Finished but has missed partitions, will force dummy exchange - cctx.shared().exchange().forceDummyExchange(true, exchFut); + ctx.exchange().forceDummyExchange(true, exchFut); return; } - if (!cancelled && !cctx.preloader().syncFuture().isDone()) - ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + if (!cancelled && !grp.preloader().syncFuture().isDone()) + ((GridFutureAdapter)grp.preloader().syncFuture()).onDone(); onDone(!cancelled); } @@ -1083,24 +1076,16 @@ public class GridDhtPartitionDemander { * */ private void sendRebalanceStartedEvent() { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) && - (!cctx.isReplicated() || !startedEvtSent.get())) { - preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); - - startedEvtSent.set(true); - } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED)) + rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); } /** * */ private void sendRebalanceFinishedEvent() { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && - (!cctx.isReplicated() || !stoppedEvtSent.get())) { - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); - - stoppedEvtSent.set(true); - } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED)) + rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index f7f0aff..afdeb8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -26,7 +27,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; @@ -47,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ class GridDhtPartitionSupplier { /** */ - private final GridCacheContext<?, ?> cctx; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -65,18 +66,18 @@ class GridDhtPartitionSupplier { private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>(); /** - * @param cctx Cache context. + * @param grp Cache group. */ - GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) { - assert cctx != null; + GridDhtPartitionSupplier(CacheGroupContext grp) { + assert grp != null; - this.cctx = cctx; + this.grp = grp; - log = cctx.logger(getClass()); + log = grp.shared().logger(getClass()); - top = cctx.dht().topology(); + top = grp.topology(); - depEnabled = cctx.gridDeploy().enabled(); + depEnabled = grp.shared().gridDeploy().enabled(); } /** @@ -171,7 +172,7 @@ class GridDhtPartitionSupplier { assert d != null; assert id != null; - AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion cutTop = grp.affinity().lastVersion(); AffinityTopologyVersion demTop = d.topologyVersion(); T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop); @@ -197,9 +198,12 @@ class GridDhtPartitionSupplier { ", from=" + id + ", idx=" + idx + "]"); GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage( - d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + d.updateSequence(), + grp.groupId(), + d.topologyVersion(), + grp.deploymentEnabled()); - ClusterNode node = cctx.discovery().node(id); + ClusterNode node = grp.shared().discovery().node(id); if (node == null) return; // Context will be cleaned at topology change. @@ -225,7 +229,7 @@ class GridDhtPartitionSupplier { boolean newReq = true; - long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount(); + long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount(); if (sctx != null) { phase = sctx.phase; @@ -234,7 +238,7 @@ class GridDhtPartitionSupplier { } else { if (log.isDebugEnabled()) - log.debug("Starting supplying rebalancing [cache=" + cctx.name() + + log.debug("Starting supplying rebalancing [cache=" + grp.cacheOrGroupName() + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -280,7 +284,7 @@ class GridDhtPartitionSupplier { IgniteRebalanceIterator iter; if (sctx == null || sctx.entryIt == null) { - iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); + iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); if (!iter.historical()) s.clean(part); @@ -289,7 +293,9 @@ class GridDhtPartitionSupplier { iter = (IgniteRebalanceIterator)sctx.entryIt; while (iter.hasNext()) { - if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { + List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part); + + if (!nodes.contains(node)) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); @@ -313,7 +319,7 @@ class GridDhtPartitionSupplier { break; } - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (s.messageSize() >= grp.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { saveSupplyContext(scId, phase, @@ -335,9 +341,9 @@ class GridDhtPartitionSupplier { return; s = new GridDhtPartitionSupplyMessage(d.updateSequence(), - cctx.cacheId(), + grp.groupId(), d.topologyVersion(), - cctx.deploymentEnabled()); + grp.deploymentEnabled()); } } @@ -349,9 +355,10 @@ class GridDhtPartitionSupplier { info.expireTime(row.expireTime()); info.version(row.version()); info.value(row.value()); + info.cacheId(row.cacheId()); if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); + s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext()); else { if (log.isDebugEnabled()) log.debug("Rebalance predicate evaluated to false (will not send " + @@ -400,7 +407,7 @@ class GridDhtPartitionSupplier { reply(node, d, s, scId); if (log.isDebugEnabled()) - log.debug("Finished supplying rebalancing [cache=" + cctx.name() + + log.debug("Finished supplying rebalancing [cache=" + grp.cacheOrGroupName() + ", fromNode=" + node.id() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -427,16 +434,15 @@ class GridDhtPartitionSupplier { GridDhtPartitionSupplyMessage s, T3<UUID, Integer, AffinityTopologyVersion> scId) throws IgniteCheckedException { - try { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout()); // Throttle preloading. - if (cctx.config().getRebalanceThrottle() > 0) - U.sleep(cctx.config().getRebalanceThrottle()); + if (grp.config().getRebalanceThrottle() > 0) + U.sleep(grp.config().getRebalanceThrottle()); return true; } @@ -469,7 +475,7 @@ class GridDhtPartitionSupplier { AffinityTopologyVersion topVer, long updateSeq) { synchronized (scMap) { - if (cctx.affinity().affinityTopologyVersion().equals(topVer)) { + if (grp.affinity().lastVersion().equals(topVer)) { assert scMap.get(t) == null; scMap.put(t, http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 903a7da..f8d4344 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -31,10 +31,11 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -45,7 +46,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Partition supply message. */ -public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; @@ -79,18 +80,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** * @param updateSeq Update sequence for this node. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param topVer Topology version. * @param addDepInfo Deployment info flag. */ GridDhtPartitionSupplyMessage(long updateSeq, - int cacheId, + int grpId, AffinityTopologyVersion topVer, boolean addDepInfo) { - this.cacheId = cacheId; + this.grpId = grpId; this.updateSeq = updateSeq; this.topVer = topVer; - this.addDepInfo = addDepInfo; } + this.addDepInfo = addDepInfo; + } /** * Empty constructor required for {@link Externalizable}. @@ -203,18 +205,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** * @param p Partition. * @param info Entry to add. - * @param ctx Cache context. + * @param ctx Cache shared context. + * @param cacheObjCtx Cache object context. * @throws IgniteCheckedException If failed. */ - void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { assert info != null; assert info.key() != null : info; assert info.value() != null : info; // Need to call this method to initialize info properly. - marshalInfo(info, ctx); + marshalInfo(info, ctx, cacheObjCtx); - msgSize += info.marshalledSize(ctx); + msgSize += info.marshalledSize(cacheObjCtx); CacheEntryInfoCollection infoCol = infos().get(p); @@ -234,13 +237,13 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - GridCacheContext cacheCtx = ctx.cacheContext(cacheId); + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); for (CacheEntryInfoCollection col : infos().values()) { List<GridCacheEntryInfo> entries = col.infos(); for (int i = 0; i < entries.size(); i++) - entries.get(i).unmarshal(cacheCtx, ldr); + entries.get(i).unmarshal(grp.cacheObjectContext(), ldr); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 74bbcb0..441952d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -65,6 +65,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public int partition() { return GridIoMessage.STRIPE_DISABLED_PART; } @@ -87,10 +92,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Parition update counters. */ - public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId); + public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId); /** * @return Last used version among all nodes. @@ -114,6 +119,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -128,19 +138,19 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } switch (writer.state()) { - case 3: + case 2: if (!writer.writeMessage("exchId", exchId)) return false; writer.incrementState(); - case 4: + case 3: if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); - case 5: + case 4: if (!writer.writeMessage("lastVer", lastVer)) return false; @@ -162,7 +172,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage return false; switch (reader.state()) { - case 3: + case 2: exchId = reader.readMessage("exchId"); if (!reader.isLastRead()) @@ -170,7 +180,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage reader.incrementState(); - case 4: + case 3: flags = reader.readByte("flags"); if (!reader.isLastRead()) @@ -178,7 +188,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage reader.incrementState(); - case 5: + case 4: lastVer = reader.readMessage("lastVer"); if (!reader.isLastRead())