Added IO management with correct handling of too early messages.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f7b8aba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f7b8aba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f7b8aba

Branch: refs/heads/ignite-4565-ddl
Commit: 9f7b8aba4b82b4e5f5d83c53bae4c08a9afd0342
Parents: f3cfae0
Author: devozerov <[email protected]>
Authored: Tue Mar 21 16:35:28 2017 +0300
Committer: devozerov <[email protected]>
Committed: Tue Mar 21 16:35:28 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../processors/query/GridQueryProcessor.java    | 218 ++++++++++++++++---
 .../query/ddl/IndexOperationStatusRequest.java  |  30 ++-
 .../query/ddl/IndexOperationStatusResponse.java |  34 ++-
 4 files changed, 250 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 3ffc56e..8c6f9aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -103,7 +103,10 @@ public enum GridTopic {
     TOPIC_MAPPING_MARSH,
 
     /** */
-    TOPIC_HADOOP_MSG;
+    TOPIC_HADOOP_MSG,
+
+    /** */
+    TOPIC_DYNAMIC_SCHEMA;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 77dc0a3..3c1a8ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.cache.CacheTypeMetadata;
 import org.apache.ignite.cache.QueryEntity;
@@ -32,6 +33,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -47,6 +50,8 @@ import 
org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import 
org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken;
+import 
org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusRequest;
+import 
org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusResponse;
 import 
org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -56,9 +61,11 @@ import 
org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import javax.cache.Cache;
@@ -71,13 +78,16 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA;
 import static org.apache.ignite.internal.IgniteComponentType.INDEXING;
 
 /**
@@ -117,6 +127,21 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** Index create/drop client futures. */
     private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new 
ConcurrentHashMap<>();
 
+    /** IO message listener. */
+    private final GridMessageListener ioLsnr;
+
+    /** Queue with pending IO messages. */
+    private final Queue<Object> ioMsgs = new ConcurrentLinkedDeque<>();
+
+    /** IO init lock. */
+    private final ReadWriteLock ioInitLock = new ReentrantReadWriteLock();
+
+    /** IO init flag. */
+    private volatile boolean ioInit;
+
+    /** IO worker to process too early IO messages. */
+    private volatile GridWorker ioWorker;
+
     /**
      * @param ctx Kernal context.
      */
@@ -130,6 +155,12 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }
         else
             idx = INDEXING.inClassPath() ? 
U.<GridQueryIndexing>newInstance(INDEXING.className()) : null;
+
+        ioLsnr = new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object obj) {
+                dispatchIoMessage(obj);
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -142,6 +173,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             idx.start(ctx, busyLock);
         }
 
+        ctx.io().addMessageListener(TOPIC_DYNAMIC_SCHEMA, ioLsnr);
+
         // Schedule queries detail metrics eviction.
         qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
             @Override public void run() {
@@ -151,6 +184,80 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        // Start IO worker to consume racy IO messages.
+        boolean startIoWorker = false;
+
+        ioInitLock.writeLock().lock();
+
+        try {
+            if (!ioMsgs.isEmpty())
+                startIoWorker = true;
+
+            ioInit = true;
+        }
+        finally {
+            ioInitLock.writeLock().unlock();
+        }
+
+        if (startIoWorker) {
+            ioWorker = new IoWorker(ctx.igniteInstanceName(), 
"query-proc-io-worker", log);
+
+            new IgniteThread(ioWorker).start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        GridWorker ioWorker0 = ioWorker;
+
+        if (ioWorker0 != null) {
+            ioWorker0.cancel();
+
+            try {
+                ioWorker0.join();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                if (log.isDebugEnabled())
+                    log.debug("Got interrupted while waiting for IO worker to 
finish.");
+            }
+        }
+
+        if (cancel && idx != null) {
+            try {
+                while (!busyLock.tryBlock(500))
+                    idx.cancelAllQueries();
+
+                return;
+            } catch (InterruptedException ignored) {
+                U.warn(log, "Interrupted while waiting for active queries 
cancellation.");
+
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        busyLock.block();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
+
+        ctx.io().removeMessageListener(TOPIC_DYNAMIC_SCHEMA, ioLsnr);
+
+        if (idx != null)
+            idx.stop();
+
+        U.closeQuiet(qryDetailMetricsEvictTask);
+    }
+
     /**
      * @return {@code true} If indexing module is in classpath and 
successfully initialized.
      */
@@ -252,36 +359,6 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (cancel && idx != null) {
-            try {
-                while (!busyLock.tryBlock(500))
-                    idx.cancelAllQueries();
-
-                return;
-            } catch (InterruptedException ignored) {
-                U.warn(log, "Interrupted while waiting for active queries 
cancellation.");
-
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        busyLock.block();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
-
-        if (idx != null)
-            idx.stop();
-
-        U.closeQuiet(qryDetailMetricsEvictTask);
-    }
-
-    /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws 
IgniteCheckedException {
         if (idx != null)
             idx.onDisconnected(reconnectFut);
@@ -1190,6 +1267,59 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Dispatch IO message.
+     *
+     * @param msg Message.
+     */
+    private void dispatchIoMessage(Object msg) {
+        if (!ioInit) {
+            ioInitLock.readLock().lock();
+
+            try {
+                if (!ioInit) {
+                    ioMsgs.add(msg);
+
+                    return;
+                }
+            }
+            finally {
+                ioInitLock.readLock().unlock();
+            }
+        }
+
+        if (msg instanceof IndexOperationStatusRequest) {
+            IndexOperationStatusRequest req = (IndexOperationStatusRequest)msg;
+
+            processStatusRequest(req);
+        }
+        else if (msg instanceof IndexOperationStatusResponse) {
+            IndexOperationStatusResponse resp = 
(IndexOperationStatusResponse)msg;
+
+            processStatusResponse(resp);
+        }
+        else
+            U.warn(log, "Unsupported IO message: " + msg);
+    }
+
+    /**
+     * Process status request.
+     *
+     * @param req Status request.
+     */
+    private void processStatusRequest(IndexOperationStatusRequest req) {
+        // TODO
+    }
+
+    /**
+     * Process status response.
+     *
+     * @param resp Status response.
+     */
+    private void processStatusResponse(IndexOperationStatusResponse resp) {
+        // TODO
+    }
+
+    /**
      * @param ver Version.
      */
     public static void 
setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {
@@ -1202,4 +1332,32 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
         return requestTopVer.get();
     }
+
+    /**
+     * IO worker to process pending IO messages.
+     */
+    private class IoWorker extends GridWorker {
+        /**
+         * Constructor.
+         *
+         * @param igniteInstanceName Ignite instance name.
+         * @param name Worker name.
+         * @param log Logger.
+         */
+        public IoWorker(@Nullable String igniteInstanceName, String name, 
IgniteLogger log) {
+            super(igniteInstanceName, name, log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                Object msg = ioMsgs.poll();
+
+                if (msg == null)
+                    break;
+
+                dispatchIoMessage(msg);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java
index 766eecf..462873e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java
@@ -32,6 +32,9 @@ public class IndexOperationStatusRequest implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Sender node ID. */
+    private UUID sndNodeId;
+
     /** Operation ID. */
     private UUID opId;
 
@@ -45,13 +48,22 @@ public class IndexOperationStatusRequest implements Message 
{
     /**
      * Constructor.
      *
+     * @param sndNodeId Sender node ID.
      * @param opId Operation ID.
      */
-    public IndexOperationStatusRequest(UUID opId, String errMsg) {
+    public IndexOperationStatusRequest(UUID sndNodeId, UUID opId) {
+        this.sndNodeId = sndNodeId;
         this.opId = opId;
     }
 
     /**
+     * @return Sender node ID.
+     */
+    public UUID senderNodeId() {
+        return sndNodeId;
+    }
+
+    /**
      * @return Operation ID.
      */
     public UUID operationId() {
@@ -71,6 +83,12 @@ public class IndexOperationStatusRequest implements Message {
 
         switch (writer.state()) {
             case 0:
+                if (!writer.writeUuid("sndNodeId", sndNodeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
                 if (!writer.writeUuid("opId", opId))
                     return false;
 
@@ -89,6 +107,14 @@ public class IndexOperationStatusRequest implements Message 
{
 
         switch (reader.state()) {
             case 0:
+                sndNodeId = reader.readUuid("sndNodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
                 opId = reader.readUuid("opId");
 
                 if (!reader.isLastRead())
@@ -107,7 +133,7 @@ public class IndexOperationStatusRequest implements Message 
{
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 1;
+        return 2;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java
index e9220c9..0ba3494 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java
@@ -33,6 +33,9 @@ public class IndexOperationStatusResponse implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Sender node ID. */
+    private UUID sndNodeId;
+
     /** Operation ID. */
     private UUID opId;
 
@@ -49,15 +52,24 @@ public class IndexOperationStatusResponse implements 
Message {
     /**
      * Constructor.
      *
+     * @param sndNodeId Sender node ID.
      * @param opId Operation ID.
      * @param errMsg Error message.
      */
-    public IndexOperationStatusResponse(UUID opId, String errMsg) {
+    public IndexOperationStatusResponse(UUID sndNodeId, UUID opId, String 
errMsg) {
+        this.sndNodeId = sndNodeId;
         this.opId = opId;
         this.errMsg = errMsg;
     }
 
     /**
+     * @return Sender node ID.
+     */
+    public UUID senderNodeId() {
+        return sndNodeId;
+    }
+
+    /**
      * @return Operation ID.
      */
     public UUID operationId() {
@@ -84,12 +96,18 @@ public class IndexOperationStatusResponse implements 
Message {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeUuid("opId", opId))
+                if (!writer.writeUuid("sndNodeId", sndNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 1:
+                if (!writer.writeUuid("opId", opId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
                 if (!writer.writeString("errMsg", errMsg))
                     return false;
 
@@ -108,7 +126,7 @@ public class IndexOperationStatusResponse implements 
Message {
 
         switch (reader.state()) {
             case 0:
-                opId = reader.readUuid("opId");
+                sndNodeId = reader.readUuid("sndNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -116,6 +134,14 @@ public class IndexOperationStatusResponse implements 
Message {
                 reader.incrementState();
 
             case 1:
+                opId = reader.readUuid("opId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
                 errMsg = reader.readString("errMsg");
 
                 if (!reader.isLastRead())
@@ -134,7 +160,7 @@ public class IndexOperationStatusResponse implements 
Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 3;
     }
 
     /** {@inheritDoc} */

Reply via email to