Repository: ignite Updated Branches: refs/heads/ignite-4565-ddl b2fd40b57 -> 9f7b8aba4
WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfe4aebb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfe4aebb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfe4aebb Branch: refs/heads/ignite-4565-ddl Commit: cfe4aebb7232362ab7cbebd9952ce5f7906b8713 Parents: b2fd40b Author: devozerov <[email protected]> Authored: Tue Mar 21 16:06:20 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Mar 21 16:06:20 2017 +0300 ---------------------------------------------------------------------- .../cache/DynamicCacheDescriptor.java | 35 ++++---------------- .../processors/cache/GridCacheProcessor.java | 10 ++++-- .../processors/query/QueryIndexStates.java | 26 +++++---------- .../query/ddl/IndexAcceptDiscoveryMessage.java | 17 ++++++++++ 4 files changed, 38 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index fdaa8ed..654f438 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -354,18 +354,16 @@ public class DynamicCacheDescriptor { synchronized (idxStatesMux) { if (!idxStatesForStartFixed) this.idxStates = idxStates != null ? idxStates.copy() : null; - - // TODO: Validate that both states are compatible? } } /** - * Try performing propose from discovery thread. + * Try applying propose message. * * @param locNodeId Local node ID. * @param msg Message. */ - public void tryProposeFromDiscoveryThread(UUID locNodeId, IndexProposeDiscoveryMessage msg) { + public void tryPropose(UUID locNodeId, IndexProposeDiscoveryMessage msg) { synchronized (idxStatesMux) { if (idxStates == null) idxStates = new QueryIndexStates(); @@ -378,17 +376,13 @@ public class DynamicCacheDescriptor { * Try applying accept message. * * @param msg Message. - * @return Result. */ - public boolean tryAccept(IndexAcceptDiscoveryMessage msg) { + public void tryAccept(IndexAcceptDiscoveryMessage msg) { synchronized (idxStatesMux) { - if (idxStatesForStartFixed) - msg.exchange(true); - if (idxStates == null) idxStates = new QueryIndexStates(); - return idxStates.accept(msg); + idxStates.accept(msg); } } @@ -396,30 +390,13 @@ public class DynamicCacheDescriptor { * Try applying finish message. * * @param msg Message. - * @return Result. */ - public boolean tryFinish(IndexFinishDiscoveryMessage msg) { + public void tryFinish(IndexFinishDiscoveryMessage msg) { synchronized (idxStatesMux) { - if (idxStatesForStartFixed) - msg.exchange(true); - if (idxStates == null) idxStates = new QueryIndexStates(); - return idxStates.finish(msg); - } - } - - /** - * Forcefully update index states from exchange thread. - * - * @param idxStates Index states. - */ - public void updateIndexStatesFromExchange(QueryIndexStates idxStates) { - synchronized (idxStatesMux) { - assert idxStatesForStartFixed; - - this.idxStates = idxStates != null ? idxStates.copy() : null; + idxStates.finish(msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5718213..2aa004c 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2789,7 +2789,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; } - desc.tryProposeFromDiscoveryThread(locNodeId, msg); + desc.tryPropose(locNodeId, msg); } /** @@ -2803,9 +2803,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(op.space()); if (desc == null) - return; + msg.onError("Cache was stopped concurrently."); + else + desc.tryAccept(msg); - desc.tryAccept(msg); + msg.exchange(true); } /** @@ -2822,6 +2824,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; desc.tryFinish(msg); + + msg.exchange(true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java index 7c955f8..a222203 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query; -import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation; import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation; import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation; @@ -99,9 +98,8 @@ public class QueryIndexStates implements Serializable { * Process accept message propagating index from proposed to accepted state. * * @param msg Message. - * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start. */ - public boolean accept(IndexAcceptDiscoveryMessage msg) { + public void accept(IndexAcceptDiscoveryMessage msg) { synchronized (mux) { AbstractIndexOperation op = msg.operation(); @@ -109,17 +107,14 @@ public class QueryIndexStates implements Serializable { QueryIndexActiveOperation curOp = activeOps.get(idxName); - if (curOp != null) { - if (F.eq(curOp.operation().operationId(), op.operationId())) { - assert !curOp.accepted(); + if (curOp != null && F.eq(curOp.operation().operationId(), op.operationId())) { + assert !curOp.accepted(); - curOp.accept(); - - return true; - } + curOp.accept(); } - - return false; + else + msg.onError("failed to apply dynamic index change operation because cache state changed " + + "concurrently."); } } @@ -127,9 +122,8 @@ public class QueryIndexStates implements Serializable { * Process finish message. * * @param msg Message. - * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start. */ - public boolean finish(IndexFinishDiscoveryMessage msg) { + public void finish(IndexFinishDiscoveryMessage msg) { synchronized (mux) { AbstractIndexOperation op = msg.operation(); @@ -153,12 +147,8 @@ public class QueryIndexStates implements Serializable { readyOps.put(idxName, state); } - - return true; } } - - return false; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java index d0fed43..a7c756f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java @@ -30,6 +30,9 @@ public class IndexAcceptDiscoveryMessage extends IndexAbstractDiscoveryMessage { /** */ private static final long serialVersionUID = 0L; + /** Error message. */ + private transient volatile String errMsg; + /** * Constructor. * @@ -49,6 +52,20 @@ public class IndexAcceptDiscoveryMessage extends IndexAbstractDiscoveryMessage { return false; } + /** + * @return Error message. + */ + @Nullable public String onError() { + return errMsg; + } + + /** + * @param errMsg Error message. + */ + public void onError(String errMsg) { + this.errMsg = errMsg; + } + /** {@inheritDoc} */ @Override public String toString() {
