Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl fe3407333 -> 69cac4f3a


Propagated cache start to index worker.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69cac4f3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69cac4f3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69cac4f3

Branch: refs/heads/ignite-4565-ddl
Commit: 69cac4f3afce412d432fed89f621f864549cb2d3
Parents: fe34073
Author: devozerov <voze...@gridgain.com>
Authored: Mon Mar 20 11:12:26 2017 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Mon Mar 20 11:12:26 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    | 86 +++++++++++++++-----
 .../processors/query/QueryIndexStates.java      | 38 ---------
 2 files changed, 65 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/69cac4f3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index b683bef..32322c3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -63,9 +63,7 @@ import 
org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
-import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
 import 
org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
-import 
org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusMessage;
 import 
org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -79,6 +77,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -115,6 +114,9 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** Index create/drop client futures. */
     private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new 
ConcurrentHashMap<>();
 
+    /** Index worker. */
+    private final DynamicIndexManagerWorker idxWorker;
+
     /** */
     private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
 
@@ -134,6 +136,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }
         else
             idx = INDEXING.inClassPath() ? 
U.<GridQueryIndexing>newInstance(INDEXING.className()) : null;
+
+        idxWorker = new DynamicIndexManagerWorker(ctx.igniteInstanceName(), 
log);
     }
 
     /** {@inheritDoc} */
@@ -146,6 +150,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             idx.start(ctx, busyLock);
         }
 
+        new IgniteThread(idxWorker).start();
+
         // Schedule queries detail metrics eviction.
         qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
             @Override public void run() {
@@ -174,11 +180,11 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param cctx Cache context.
-     * @param idxStates Index states.
+     * @param initIdxStates Index states.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("deprecation")
-    private void initializeCache(GridCacheContext<?, ?> cctx, @Nullable 
QueryIndexStates idxStates)
+    private void initializeCache(GridCacheContext<?, ?> cctx, @Nullable 
QueryIndexStates initIdxStates)
         throws IgniteCheckedException {
         String space = cctx.name();
 
@@ -207,16 +213,20 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }
 
         // Apply dynamic changes to candidates.
-        if (idxStates != null) {
-            Map<String, QueryIndexState> readyIdxStates = 
idxStates.readyOperations();
+        if (initIdxStates != null) {
+            Map<String, QueryIndexState> readyIdxStates = 
initIdxStates.readyOperations();
 
             for (QueryTypeCandidate cand : cands)
-                applyInitialDelta(cand.descriptor(), readyIdxStates);
+                applyReadyDynamicOperations(cand.descriptor(), readyIdxStates);
         }
 
         // Ready to register at this point.
         registerCache0(space, cctx, cands);
 
+        // Active operations will be applied from worker thread.
+        if (initIdxStates != null)
+            idxWorker.onCacheStart(space, initIdxStates);
+
         // Warn about possible implicit deserialization.
         if (!mustDeserializeClss.isEmpty()) {
             U.warn(log, "Some classes in query configuration cannot be written 
in binary format " +
@@ -234,26 +244,14 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param desc Descriptor.
      * @param idxStates Index states.
      */
-    private void applyInitialDelta(QueryTypeDescriptorImpl desc, Map<String, 
QueryIndexState> idxStates)
+    private void applyReadyDynamicOperations(QueryTypeDescriptorImpl desc, 
Map<String, QueryIndexState> idxStates)
         throws IgniteCheckedException {
         for (Map.Entry<String, QueryIndexState> entry : idxStates.entrySet()) {
             String idxName = entry.getKey();
             QueryIndexState idxState = entry.getValue();
 
-
+            QueryUtils.processDynamicIndexChange(idxName, idxState.index(), 
desc);
         }
-
-        // TODO
-//        Map<String, QueryIndex> delta = 
idxStates.initialDelta(desc.tableName());
-//
-//        if (!F.isEmpty(delta)) {
-//            for (Map.Entry<String, QueryIndex> deltaEntry : 
delta.entrySet()) {
-//                String idxName = deltaEntry.getKey();
-//                QueryIndex idx = deltaEntry.getValue();
-//
-//                QueryUtils.processDynamicIndexChange(idxName, idx, desc);
-//            }
-//        }
     }
 
     /** {@inheritDoc} */
@@ -1292,6 +1290,16 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }
 
         /**
+         * Cache started callback.
+         *
+         * @param space Space.
+         * @param initIdxStates Initial index states.
+         */
+        public void onCacheStart(String space, QueryIndexStates initIdxStates) 
{
+            submit(new CacheStartTask(space, initIdxStates));
+        }
+
+        /**
          * Update topology in response to node leave event.
          */
         private void updateTopology() {
@@ -1351,6 +1359,42 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Cache start task.
+     */
+    private static class CacheStartTask implements DynamicIndexTask {
+        /** Space. */
+        private final String space;
+
+        /** Initial index states. */
+        private final QueryIndexStates initIdxStates;
+
+        /**
+         * Constructor.
+         *
+         * @param space Space.
+         * @param initIdxStates Initial index states.
+         */
+        public CacheStartTask(String space, QueryIndexStates initIdxStates) {
+            this.space = space;
+            this.initIdxStates = initIdxStates;
+        }
+
+        /**
+         * @return Space.
+         */
+        public String space() {
+            return space;
+        }
+
+        /**
+         * @return Initial index states.
+         */
+        public QueryIndexStates initialIndexStates() {
+            return initIdxStates;
+        }
+    }
+
+    /**
      * Change index task.
      */
     private static class ChangeIndexTask implements DynamicIndexTask {

http://git-wip-us.apache.org/repos/asf/ignite/blob/69cac4f3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
index ea36c77..314d9ae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java
@@ -163,44 +163,6 @@ public class QueryIndexStates implements Serializable {
     }
 
     /**
-     * Get initial delta to be applied to the table.
-     * @param tblName Table name.
-     * @return Delta.
-     */
-    public Map<String, QueryIndex> initialDelta(String tblName) {
-        synchronized (mux) {
-            Map<String, QueryIndex> res = new HashMap<>();
-
-            for (QueryIndexState idxState : readyOps.values()) {
-                if (F.eq(tblName, idxState.tableName()))
-                    res.put(idxState.indexName(), idxState.index());
-            }
-
-            for (Map.Entry<String, QueryIndexActiveOperation> op : 
activeOps.entrySet()) {
-                if (op.getValue().accepted()) {
-                    AbstractIndexOperation op0 = op.getValue().operation();
-
-                    if (F.eq(tblName, op0.tableName())) {
-                        QueryIndex idx;
-
-                        if (op0 instanceof CreateIndexOperation)
-                            idx = ((CreateIndexOperation)op0).index();
-                        else {
-                            assert op0 instanceof DropIndexOperation;
-
-                            idx = null;
-                        }
-
-                        res.put(op0.indexName(), idx);
-                    }
-                }
-            }
-
-            return res;
-        }
-    }
-
-    /**
      * @return Accepted active operations.
      */
     public Map<String, QueryIndexActiveOperation> acceptedActiveOperations() {

Reply via email to