Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl b2fd40b57 -> 9f7b8aba4


WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfe4aebb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfe4aebb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfe4aebb

Branch: refs/heads/ignite-4565-ddl
Commit: cfe4aebb7232362ab7cbebd9952ce5f7906b8713
Parents: b2fd40b
Author: devozerov <[email protected]>
Authored: Tue Mar 21 16:06:20 2017 +0300
Committer: devozerov <[email protected]>
Committed: Tue Mar 21 16:06:20 2017 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           | 35 ++++----------------
 .../processors/cache/GridCacheProcessor.java    | 10 ++++--
 .../processors/query/QueryIndexStates.java      | 26 +++++----------
 .../query/ddl/IndexAcceptDiscoveryMessage.java  | 17 ++++++++++
 4 files changed, 38 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index fdaa8ed..654f438 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -354,18 +354,16 @@ public class DynamicCacheDescriptor {
         synchronized (idxStatesMux) {
             if (!idxStatesForStartFixed)
                 this.idxStates = idxStates != null ? idxStates.copy() : null;
-
-            // TODO: Validate that both states are compatible?
         }
     }
 
     /**
-     * Try performing propose from discovery thread.
+     * Try applying propose message.
      *
      * @param locNodeId Local node ID.
      * @param msg Message.
      */
-    public void tryProposeFromDiscoveryThread(UUID locNodeId, 
IndexProposeDiscoveryMessage msg) {
+    public void tryPropose(UUID locNodeId, IndexProposeDiscoveryMessage msg) {
         synchronized (idxStatesMux) {
             if (idxStates == null)
                 idxStates = new QueryIndexStates();
@@ -378,17 +376,13 @@ public class DynamicCacheDescriptor {
      * Try applying accept message.
      *
      * @param msg Message.
-     * @return Result.
      */
-    public boolean tryAccept(IndexAcceptDiscoveryMessage msg) {
+    public void tryAccept(IndexAcceptDiscoveryMessage msg) {
         synchronized (idxStatesMux) {
-            if (idxStatesForStartFixed)
-                msg.exchange(true);
-
             if (idxStates == null)
                 idxStates = new QueryIndexStates();
 
-            return idxStates.accept(msg);
+            idxStates.accept(msg);
         }
     }
 
@@ -396,30 +390,13 @@ public class DynamicCacheDescriptor {
      * Try applying finish message.
      *
      * @param msg Message.
-     * @return Result.
      */
-    public boolean tryFinish(IndexFinishDiscoveryMessage msg) {
+    public void tryFinish(IndexFinishDiscoveryMessage msg) {
         synchronized (idxStatesMux) {
-            if (idxStatesForStartFixed)
-                msg.exchange(true);
-
             if (idxStates == null)
                 idxStates = new QueryIndexStates();
 
-            return idxStates.finish(msg);
-        }
-    }
-
-    /**
-     * Forcefully update index states from exchange thread.
-     *
-     * @param idxStates Index states.
-     */
-    public void updateIndexStatesFromExchange(QueryIndexStates idxStates) {
-        synchronized (idxStatesMux) {
-            assert idxStatesForStartFixed;
-
-            this.idxStates = idxStates != null ? idxStates.copy() : null;
+            idxStates.finish(msg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5718213..2aa004c 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2789,7 +2789,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             return;
         }
 
-        desc.tryProposeFromDiscoveryThread(locNodeId, msg);
+        desc.tryPropose(locNodeId, msg);
     }
 
     /**
@@ -2803,9 +2803,11 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         DynamicCacheDescriptor desc = cacheDescriptor(op.space());
 
         if (desc == null)
-            return;
+            msg.onError("Cache was stopped concurrently.");
+        else
+            desc.tryAccept(msg);
 
-        desc.tryAccept(msg);
+        msg.exchange(true);
     }
 
     /**
@@ -2822,6 +2824,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             return;
 
         desc.tryFinish(msg);
+
+        msg.exchange(true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
index 7c955f8..a222203 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query;
 
-import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
@@ -99,9 +98,8 @@ public class QueryIndexStates implements Serializable {
      * Process accept message propagating index from proposed to accepted 
state.
      *
      * @param msg Message.
-     * @return {@code True} if accept succeeded. It may fail in case of 
concurrent cache stop/start.
      */
-    public boolean accept(IndexAcceptDiscoveryMessage msg) {
+    public void accept(IndexAcceptDiscoveryMessage msg) {
         synchronized (mux) {
             AbstractIndexOperation op = msg.operation();
 
@@ -109,17 +107,14 @@ public class QueryIndexStates implements Serializable {
 
             QueryIndexActiveOperation curOp = activeOps.get(idxName);
 
-            if (curOp != null) {
-                if (F.eq(curOp.operation().operationId(), op.operationId())) {
-                    assert !curOp.accepted();
+            if (curOp != null && F.eq(curOp.operation().operationId(), 
op.operationId())) {
+                assert !curOp.accepted();
 
-                    curOp.accept();
-
-                    return true;
-                }
+                curOp.accept();
             }
-
-            return false;
+            else
+                msg.onError("failed to apply dynamic index change operation 
because cache state changed " +
+                    "concurrently.");
         }
     }
 
@@ -127,9 +122,8 @@ public class QueryIndexStates implements Serializable {
      * Process finish message.
      *
      * @param msg Message.
-     * @return {@code True} if accept succeeded. It may fail in case of 
concurrent cache stop/start.
      */
-    public boolean finish(IndexFinishDiscoveryMessage msg) {
+    public void finish(IndexFinishDiscoveryMessage msg) {
         synchronized (mux) {
             AbstractIndexOperation op = msg.operation();
 
@@ -153,12 +147,8 @@ public class QueryIndexStates implements Serializable {
 
                         readyOps.put(idxName, state);
                     }
-
-                    return true;
                 }
             }
-
-            return false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfe4aebb/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
index d0fed43..a7c756f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAcceptDiscoveryMessage.java
@@ -30,6 +30,9 @@ public class IndexAcceptDiscoveryMessage extends 
IndexAbstractDiscoveryMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Error message. */
+    private transient volatile String errMsg;
+
     /**
      * Constructor.
      *
@@ -49,6 +52,20 @@ public class IndexAcceptDiscoveryMessage extends 
IndexAbstractDiscoveryMessage {
         return false;
     }
 
+    /**
+     * @return Error message.
+     */
+    @Nullable public String onError() {
+        return errMsg;
+    }
+
+    /**
+     * @param errMsg Error message.
+     */
+    public void onError(String errMsg) {
+        this.errMsg = errMsg;
+    }
+
 
     /** {@inheritDoc} */
     @Override public String toString() {

Reply via email to