ignite-45 - Fixed topology updates.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/74813a60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/74813a60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/74813a60 Branch: refs/heads/ignite-45 Commit: 74813a60127c9c5584b95f7a3ca755a232a6f983 Parents: 84e3196 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Sun Mar 15 22:06:45 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Sun Mar 15 22:06:45 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/ignite/Ignite.java | 4 --- .../GridCachePartitionExchangeManager.java | 6 ++-- .../dht/GridClientPartitionTopology.java | 19 ++++++++----- .../dht/GridDhtPartitionTopology.java | 4 +-- .../dht/GridDhtPartitionTopologyImpl.java | 13 +++++---- .../preloader/GridDhtPartitionExchangeId.java | 8 ------ .../GridDhtPartitionsExchangeFuture.java | 30 ++++++++++++++++---- 7 files changed, 49 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 4a5b469..7201734 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -41,7 +41,6 @@ import java.util.concurrent.*; * <p> * In addition to {@link ClusterGroup} functionality, from here you can get the following: * <ul> - * <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li> * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li> * <li>{@link IgniteFileSystem} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li> * <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li> @@ -198,9 +197,6 @@ public interface Ignite extends AutoCloseable { public <K, V> IgniteCache<K, V> createCache(NearCacheConfiguration<K, V> nearCfg); - // TODO IGNITE-45 - //public <K, V> IgniteCache<K, V> createCache(String name, String path); - /** * Stops dynamically started cache. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3e236bf..1c4c0f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -332,17 +332,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param cacheId Cache ID. - * @param exchId Exchange ID. + * @param exchFut Exchange future. * @return Topology. */ - public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionExchangeId exchId) { + public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionsExchangeFuture exchFut) { GridClientPartitionTopology top = clientTops.get(cacheId); if (top != null) return top; GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId, - top = new GridClientPartitionTopology(cctx, cacheId, exchId)); + top = new GridClientPartitionTopology(cctx, cacheId, exchFut)); return old != null ? old : top; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 646234c..27e1c22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -80,18 +80,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** * @param cctx Context. * @param cacheId Cache ID. - * @param exchId Exchange ID. + * @param exchFut Exchange ID. */ - public GridClientPartitionTopology(GridCacheSharedContext cctx, int cacheId, - GridDhtPartitionExchangeId exchId) { + public GridClientPartitionTopology( + GridCacheSharedContext cctx, + int cacheId, + GridDhtPartitionsExchangeFuture exchFut + ) { this.cctx = cctx; this.cacheId = cacheId; - topVer = exchId.topologyVersion(); + topVer = exchFut.topologyVersion(); log = cctx.logger(getClass()); - beforeExchange(exchId); + beforeExchange(exchFut); } /** @@ -181,7 +184,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) { + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) { ClusterNode loc = cctx.localNode(); lock.writeLock().lock(); @@ -190,6 +193,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (stopping) return; + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; @@ -205,7 +210,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id())) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 7db15c7..dcc0502 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -67,10 +67,10 @@ public interface GridDhtPartitionTopology { /** * Pre-initializes this topology. * - * @param exchId Exchange ID for this pre-initialization. + * @param exchFut Exchange future. * @throws IgniteCheckedException If failed. */ - public void beforeExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException; + public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException; /** * Post-initializes this topology. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 7a172cd..c123e92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -207,7 +207,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { waitForRent(); ClusterNode loc = cctx.localNode(); @@ -217,6 +217,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { lock.writeLock().lock(); try { + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + if (stopping) return; @@ -235,7 +237,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchId.isCacheAdded(cctx.cacheId())) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq); @@ -262,9 +264,10 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { if (cctx.rebalanceEnabled()) { for (int p = 0; p < num; p++) { // If this is the first node in grid. - if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) || exchId.isCacheAdded( - cctx.cacheId())) { - assert exchId.isJoined() || exchId.isCacheAdded(cctx.cacheId()); + boolean added = exchFut.isCacheAdded(cctx.cacheId()); + + if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) { + assert exchId.isJoined() || added; try { GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java index 96cd7cb..22910fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java @@ -106,14 +106,6 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa return evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED; } - /** - * @return {@code True} if cache was added with this exchange ID. - */ - public boolean isCacheAdded(int cacheId) { - // TODO IGNITE-45 add cache added flag. - return evt == EVT_DISCOVERY_CUSTOM_EVT; - } - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeUuid(out, nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74813a60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 19c609d..7858b84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -275,6 +275,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param cacheId Cache ID to check. + * @return {@code True} if cache was added during this exchange. + */ + public boolean isCacheAdded(int cacheId) { + if (!F.isEmpty(reqs)) { + for (DynamicCacheChangeRequest req : reqs) { + if (req.isStart() && !req.clientStartOnly()) { + if (CU.cacheId(req.cacheName()) == cacheId) + return true; + } + } + } + + return false; + } + + /** * Rechecks topology. */ private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { @@ -488,13 +505,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert topVer.equals(top.topologyVersion()) : "Topology version is updated only in this class instances inside single ExchangeWorker thread."; - top.beforeExchange(exchId); + top.beforeExchange(this); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { top.updateTopologyVersion(exchId, this, stopping(top.cacheId())); - top.beforeExchange(exchId); + top.beforeExchange(this); } } catch (IgniteInterruptedCheckedException e) { @@ -549,7 +566,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @return {@code True} if no distributed exchange is needed. */ private boolean canSkipExchange() { - return false; // TODO ignite-45; + return false; // TODO ignite-23; } /** @@ -907,7 +924,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue()); else if (CU.oldest(cctx).isLocal()) - cctx.exchange().clientTopology(cacheId, exchId).update(exchId, entry.getValue()); + cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); } } @@ -922,7 +939,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridCacheContext cacheCtx = cctx.cacheContext(cacheId); GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : - cctx.exchange().clientTopology(cacheId, exchId); + cctx.exchange().clientTopology(cacheId, this); top.update(exchId, entry.getValue()); } @@ -977,7 +994,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange(exchId); + cacheCtx.topology().beforeExchange( + GridDhtPartitionsExchangeFuture.this); } } catch (IgniteCheckedException e) {