WIP on state concurrency.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4376670b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4376670b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4376670b Branch: refs/heads/ignite-4565-ddl Commit: 4376670bbf6354deafbd787dca10d8cbe26fc7e5 Parents: d7bf1b7 Author: devozerov <[email protected]> Authored: Fri Mar 17 14:44:52 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Mar 17 14:44:52 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 2 +- .../cache/DynamicCacheChangeRequest.java | 2 +- .../cache/DynamicCacheDescriptor.java | 120 +++++++++++++++++-- .../processors/cache/GridCacheProcessor.java | 64 +++++----- .../processors/query/QueryIndexStates.java | 7 +- .../ddl/IndexAbstractDiscoveryMessage.java | 17 +++ 6 files changed, 154 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index d2b09a8..0ce8718 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -352,7 +352,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap false, req.deploymentId()); - desc.indexStates(req.indexStates()); + desc.tryUpdateFromDiscovery(req.indexStates()); DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index bd2ea17..3610a60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -301,7 +301,7 @@ public class DynamicCacheChangeRequest implements Serializable { * @param idxStates Index states. */ public void indexStates(QueryIndexStates idxStates) { - this.idxStates = idxStates; + this.idxStates = idxStates != null ? idxStates.copy() : null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index a90fb72..a6de0ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -25,10 +25,15 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.QueryIndexStates; +import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage; +import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage; +import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -82,10 +87,16 @@ public class DynamicCacheDescriptor { /** */ private AffinityTopologyVersion rcvdFromVer; - /** Initial dynamic index state as observed from cache processor start() method and discovery join process. */ - private QueryIndexStates initIdxStates; + /** Mutex to control index states. */ + private final Object idxStatesMux = new Object(); - /** Dynamic index states. */ + /** Initial index states which is used to start cache. */ + private QueryIndexStates idxStatesForStart; + + /** Whether index states for start is fixed. */ + private boolean idxStatesForStartFixed; + + /** Current index states. */ private QueryIndexStates idxStates; /** @@ -310,31 +321,114 @@ public class DynamicCacheDescriptor { } /** + * Get index state for cache start. Once requested it never changes afterwards. + * + * @return Index states for cache start. + */ + public QueryIndexStates indexStatesForStart() { + synchronized (idxStatesMux) { + if (!idxStatesForStartFixed) { + idxStatesForStart = idxStates; + + idxStatesForStartFixed = true; + } + + return idxStatesForStart != null ? idxStatesForStart.copy() : null; + } + } + + /** * @return Index states. */ - public QueryIndexStates initialIndexStates() { - return initIdxStates; + public QueryIndexStates indexStates() { + synchronized (idxStatesMux) { + return idxStates != null ? idxStates.copy() : null; + } } /** - * @param initIdxStates Index states. + * Try updating index state from discovery thread. If start state is not fixed yet, update will succeed and return + * {@code true}. + * + * @param idxStates Index states. */ - public void initialIndexStates(QueryIndexStates initIdxStates) { - this.initIdxStates = initIdxStates != null ? initIdxStates.copy() : null; + public void tryUpdateFromDiscovery(QueryIndexStates idxStates) { + synchronized (idxStatesMux) { + if (!idxStatesForStartFixed) + this.idxStates = idxStates != null ? idxStates.copy() : null; + } } /** - * @return Index states. + * Try performing propose from discovery thread. + * + * @param locNodeId Local node ID. + * @param msg Message. */ - public QueryIndexStates indexStates() { - return idxStates; + public void tryProposeFromDiscoveryThread(UUID locNodeId, IndexProposeDiscoveryMessage msg) { + synchronized (idxStatesMux) { + if (idxStates == null) + idxStates = new QueryIndexStates(); + + idxStates.propose(locNodeId, msg); + } } /** + * Try applying accept message. + * + * @param msg Message. + * @param disco Whether call is performed from discovery thread. + * @return Result. + */ + public boolean tryAccept(IndexAcceptDiscoveryMessage msg, boolean disco) { + synchronized (idxStatesMux) { + if (disco && idxStatesForStartFixed) { + msg.exchange(true); + + return false; + } + + if (idxStates == null) + idxStates = new QueryIndexStates(); + + return idxStates.accept(msg); + } + } + + /** + * Try applying finish message. + * + * @param msg Message. + * @param disco Whether call is performed from discovery thread. + * @return Result. + */ + public boolean tryFinish(IndexFinishDiscoveryMessage msg, boolean disco) { + synchronized (idxStatesMux) { + if (disco && idxStatesForStartFixed) { + msg.exchange(true); + + return false; + } + + if (idxStates == null) + idxStates = new QueryIndexStates(); + + return idxStates.finish(msg); + } + } + + /** + * Forcefully update index states from exchange thread. + * * @param idxStates Index states. */ - public void indexStates(QueryIndexStates idxStates) { - this.idxStates = idxStates != null ? idxStates.copy() : null; + public void updateIndexStatesFromExchange(QueryIndexStates idxStates) { + synchronized (idxStatesMux) { + assert idxStatesForStartFixed; + + this.idxStates = idxStates != null ? idxStates.copy() : null; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 8b8f5c7..8ae3283 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -825,7 +825,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches.put(maskNull(name), cache); - startCache(cache, desc.indexStates()); + startCache(cache, desc.indexStatesForStart()); jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); } @@ -1698,7 +1698,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { null, desc.deploymentId(), topVer, - desc.indexStates() + desc.indexStatesForStart() ); } } @@ -2133,7 +2133,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.getCacheMode()); } - existing.indexStates(req.indexStates()); + existing.tryUpdateFromDiscovery(req.indexStates()); } else { assert req.cacheType() != null : req; @@ -2145,8 +2145,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { false, req.deploymentId()); - desc.initialIndexStates(req.indexStates()); - desc.indexStates(req.indexStates()); + desc.tryUpdateFromDiscovery(req.indexStates()); // Received statically configured cache. if (req.initiatingNodeId() == null) @@ -2695,21 +2694,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteUuid id = msg0.id(); - boolean res = idxDiscoMsgIdHist.add(id); - - if (!idxDiscoMsgIdHist.add(id)) + if (!idxDiscoMsgIdHist.add(id)) { U.warn(log, "Received duplicate index change discovery message (will ignore): " + msg); - else { - if (msg instanceof IndexProposeDiscoveryMessage) - onIndexProposeMessage((IndexProposeDiscoveryMessage)msg); - else if (msg instanceof IndexAcceptDiscoveryMessage) - onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg); - else if (msg instanceof IndexFinishDiscoveryMessage) - onIndexFinishMessage((IndexFinishDiscoveryMessage)msg); - else - U.warn(log, "Unsupported index discovery message type (will ignore): " + msg); + + return false; } + if (msg instanceof IndexProposeDiscoveryMessage) + onIndexProposeMessage((IndexProposeDiscoveryMessage)msg); + else if (msg instanceof IndexAcceptDiscoveryMessage) + onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg); + else if (msg instanceof IndexFinishDiscoveryMessage) + onIndexFinishMessage((IndexFinishDiscoveryMessage)msg); + else + U.warn(log, "Unsupported index discovery message type (will ignore): " + msg); + return false; } @@ -2741,20 +2740,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(op.space()); if (desc == null) { - msg.onError(locNodeId, "Cache doesn't exit on node [cacheName=" + op.space() + - ". nodeId=" + locNodeId + ']'); + msg.onError(locNodeId, "Cache doesn't exit [cacheName=" + op.space() + ", nodeId=" + locNodeId + ']'); return; } - // Validate request at descriptor level. - QueryIndexStates idxStates = desc.indexStates(); - - if (idxStates == null) - idxStates = new QueryIndexStates(); - - if (idxStates.propose(locNodeId, msg)) - desc.indexStates(idxStates); + desc.tryProposeFromDiscoveryThread(locNodeId, msg); } /** @@ -2770,12 +2761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (desc == null) return; - QueryIndexStates idxStates = desc.indexStates(); - - if (idxStates == null || !idxStates.accept(msg)) - return; - - ctx.query().onIndexAccept(op.space(), op); + desc.tryAccept(msg, true); } /** @@ -2784,9 +2770,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param msg Message. */ private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { - // TODO: Clear dynamic descriptors! + AbstractIndexOperation op = msg.operation(); + + DynamicCacheDescriptor desc = cacheDescriptor(op.space()); + + if (desc == null) + return; - // TODO: Delegate to indexing to handle result and complete client futures! + desc.tryFinish(msg, true); } /** @@ -2858,8 +2849,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId()); - startDesc.initialIndexStates(req.indexStates()); - startDesc.indexStates(req.indexStates()); + startDesc.tryUpdateFromDiscovery(req.indexStates()); if (newTopVer == null) { newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java index 7e8e699..ea36c77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java @@ -79,9 +79,8 @@ public class QueryIndexStates implements Serializable { * * @param locNodeId Local node ID. * @param msg Propose message. - * @return {@code True} if propose succeeded. */ - public boolean propose(UUID locNodeId, IndexProposeDiscoveryMessage msg) { + public void propose(UUID locNodeId, IndexProposeDiscoveryMessage msg) { synchronized (mux) { AbstractIndexOperation op = msg.operation(); @@ -90,13 +89,9 @@ public class QueryIndexStates implements Serializable { if (activeOps.containsKey(idxName)) { msg.onError(locNodeId, "Failed to initiate index create/drop because another operation on the same " + "index is in progress: " + idxName); - - return false; } activeOps.put(idxName, new QueryIndexActiveOperation(op)); - - return true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4376670b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java index c362b91..11d8f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java @@ -38,6 +38,9 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe @GridToStringInclude protected final AbstractIndexOperation op; + /** Whether request must be propagated to exchange worker for final processing. */ + private transient boolean exchange; + /** Local cache index state at the moment of message receive. */ private transient QueryIndexStates idxStates; @@ -76,6 +79,20 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe this.idxStates = idxStates; } + /** + * @return Whether request must be propagated to exchange worker for final processing. + */ + public boolean exchange() { + return exchange; + } + + /** + * @param exchange Whether request must be propagated to exchange worker for final processing. + */ + public void exchange(boolean exchange) { + this.exchange = exchange; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IndexAbstractDiscoveryMessage.class, this);
