Added IO management with correct handling of too early messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f7b8aba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f7b8aba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f7b8aba Branch: refs/heads/ignite-4565-ddl Commit: 9f7b8aba4b82b4e5f5d83c53bae4c08a9afd0342 Parents: f3cfae0 Author: devozerov <[email protected]> Authored: Tue Mar 21 16:35:28 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Mar 21 16:35:28 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../processors/query/GridQueryProcessor.java | 218 ++++++++++++++++--- .../query/ddl/IndexOperationStatusRequest.java | 30 ++- .../query/ddl/IndexOperationStatusResponse.java | 34 ++- 4 files changed, 250 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 3ffc56e..8c6f9aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -103,7 +103,10 @@ public enum GridTopic { TOPIC_MAPPING_MARSH, /** */ - TOPIC_HADOOP_MSG; + TOPIC_HADOOP_MSG, + + /** */ + TOPIC_DYNAMIC_SCHEMA; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 77dc0a3..3c1a8ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.cache.CacheTypeMetadata; import org.apache.ignite.cache.QueryEntity; @@ -32,6 +33,8 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -47,6 +50,8 @@ import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation; import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage; import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusRequest; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusResponse; import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -56,9 +61,11 @@ import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import javax.cache.Cache; @@ -71,13 +78,16 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; +import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA; import static org.apache.ignite.internal.IgniteComponentType.INDEXING; /** @@ -117,6 +127,21 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** Index create/drop client futures. */ private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new ConcurrentHashMap<>(); + /** IO message listener. */ + private final GridMessageListener ioLsnr; + + /** Queue with pending IO messages. */ + private final Queue<Object> ioMsgs = new ConcurrentLinkedDeque<>(); + + /** IO init lock. */ + private final ReadWriteLock ioInitLock = new ReentrantReadWriteLock(); + + /** IO init flag. */ + private volatile boolean ioInit; + + /** IO worker to process too early IO messages. */ + private volatile GridWorker ioWorker; + /** * @param ctx Kernal context. */ @@ -130,6 +155,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { } else idx = INDEXING.inClassPath() ? U.<GridQueryIndexing>newInstance(INDEXING.className()) : null; + + ioLsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object obj) { + dispatchIoMessage(obj); + } + }; } /** {@inheritDoc} */ @@ -142,6 +173,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.start(ctx, busyLock); } + ctx.io().addMessageListener(TOPIC_DYNAMIC_SCHEMA, ioLsnr); + // Schedule queries detail metrics eviction. qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() { @Override public void run() { @@ -151,6 +184,80 @@ public class GridQueryProcessor extends GridProcessorAdapter { }, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ); } + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + // Start IO worker to consume racy IO messages. + boolean startIoWorker = false; + + ioInitLock.writeLock().lock(); + + try { + if (!ioMsgs.isEmpty()) + startIoWorker = true; + + ioInit = true; + } + finally { + ioInitLock.writeLock().unlock(); + } + + if (startIoWorker) { + ioWorker = new IoWorker(ctx.igniteInstanceName(), "query-proc-io-worker", log); + + new IgniteThread(ioWorker).start(); + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + GridWorker ioWorker0 = ioWorker; + + if (ioWorker0 != null) { + ioWorker0.cancel(); + + try { + ioWorker0.join(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + if (log.isDebugEnabled()) + log.debug("Got interrupted while waiting for IO worker to finish."); + } + } + + if (cancel && idx != null) { + try { + while (!busyLock.tryBlock(500)) + idx.cancelAllQueries(); + + return; + } catch (InterruptedException ignored) { + U.warn(log, "Interrupted while waiting for active queries cancellation."); + + Thread.currentThread().interrupt(); + } + } + + busyLock.block(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + ctx.io().removeMessageListener(TOPIC_DYNAMIC_SCHEMA, ioLsnr); + + if (idx != null) + idx.stop(); + + U.closeQuiet(qryDetailMetricsEvictTask); + } + /** * @return {@code true} If indexing module is in classpath and successfully initialized. */ @@ -252,36 +359,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (cancel && idx != null) { - try { - while (!busyLock.tryBlock(500)) - idx.cancelAllQueries(); - - return; - } catch (InterruptedException ignored) { - U.warn(log, "Interrupted while waiting for active queries cancellation."); - - Thread.currentThread().interrupt(); - } - } - - busyLock.block(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (idx != null) - idx.stop(); - - U.closeQuiet(qryDetailMetricsEvictTask); - } - - /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { if (idx != null) idx.onDisconnected(reconnectFut); @@ -1190,6 +1267,59 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Dispatch IO message. + * + * @param msg Message. + */ + private void dispatchIoMessage(Object msg) { + if (!ioInit) { + ioInitLock.readLock().lock(); + + try { + if (!ioInit) { + ioMsgs.add(msg); + + return; + } + } + finally { + ioInitLock.readLock().unlock(); + } + } + + if (msg instanceof IndexOperationStatusRequest) { + IndexOperationStatusRequest req = (IndexOperationStatusRequest)msg; + + processStatusRequest(req); + } + else if (msg instanceof IndexOperationStatusResponse) { + IndexOperationStatusResponse resp = (IndexOperationStatusResponse)msg; + + processStatusResponse(resp); + } + else + U.warn(log, "Unsupported IO message: " + msg); + } + + /** + * Process status request. + * + * @param req Status request. + */ + private void processStatusRequest(IndexOperationStatusRequest req) { + // TODO + } + + /** + * Process status response. + * + * @param resp Status response. + */ + private void processStatusResponse(IndexOperationStatusResponse resp) { + // TODO + } + + /** * @param ver Version. */ public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) { @@ -1202,4 +1332,32 @@ public class GridQueryProcessor extends GridProcessorAdapter { public static AffinityTopologyVersion getRequestAffinityTopologyVersion() { return requestTopVer.get(); } + + /** + * IO worker to process pending IO messages. + */ + private class IoWorker extends GridWorker { + /** + * Constructor. + * + * @param igniteInstanceName Ignite instance name. + * @param name Worker name. + * @param log Logger. + */ + public IoWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) { + super(igniteInstanceName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + while (!isCancelled()) { + Object msg = ioMsgs.poll(); + + if (msg == null) + break; + + dispatchIoMessage(msg); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java index 766eecf..462873e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java @@ -32,6 +32,9 @@ public class IndexOperationStatusRequest implements Message { /** */ private static final long serialVersionUID = 0L; + /** Sender node ID. */ + private UUID sndNodeId; + /** Operation ID. */ private UUID opId; @@ -45,13 +48,22 @@ public class IndexOperationStatusRequest implements Message { /** * Constructor. * + * @param sndNodeId Sender node ID. * @param opId Operation ID. */ - public IndexOperationStatusRequest(UUID opId, String errMsg) { + public IndexOperationStatusRequest(UUID sndNodeId, UUID opId) { + this.sndNodeId = sndNodeId; this.opId = opId; } /** + * @return Sender node ID. + */ + public UUID senderNodeId() { + return sndNodeId; + } + + /** * @return Operation ID. */ public UUID operationId() { @@ -71,6 +83,12 @@ public class IndexOperationStatusRequest implements Message { switch (writer.state()) { case 0: + if (!writer.writeUuid("sndNodeId", sndNodeId)) + return false; + + writer.incrementState(); + + case 1: if (!writer.writeUuid("opId", opId)) return false; @@ -89,6 +107,14 @@ public class IndexOperationStatusRequest implements Message { switch (reader.state()) { case 0: + sndNodeId = reader.readUuid("sndNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: opId = reader.readUuid("opId"); if (!reader.isLastRead()) @@ -107,7 +133,7 @@ public class IndexOperationStatusRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 1; + return 2; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9f7b8aba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java index e9220c9..0ba3494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java @@ -33,6 +33,9 @@ public class IndexOperationStatusResponse implements Message { /** */ private static final long serialVersionUID = 0L; + /** Sender node ID. */ + private UUID sndNodeId; + /** Operation ID. */ private UUID opId; @@ -49,15 +52,24 @@ public class IndexOperationStatusResponse implements Message { /** * Constructor. * + * @param sndNodeId Sender node ID. * @param opId Operation ID. * @param errMsg Error message. */ - public IndexOperationStatusResponse(UUID opId, String errMsg) { + public IndexOperationStatusResponse(UUID sndNodeId, UUID opId, String errMsg) { + this.sndNodeId = sndNodeId; this.opId = opId; this.errMsg = errMsg; } /** + * @return Sender node ID. + */ + public UUID senderNodeId() { + return sndNodeId; + } + + /** * @return Operation ID. */ public UUID operationId() { @@ -84,12 +96,18 @@ public class IndexOperationStatusResponse implements Message { switch (writer.state()) { case 0: - if (!writer.writeUuid("opId", opId)) + if (!writer.writeUuid("sndNodeId", sndNodeId)) return false; writer.incrementState(); case 1: + if (!writer.writeUuid("opId", opId)) + return false; + + writer.incrementState(); + + case 2: if (!writer.writeString("errMsg", errMsg)) return false; @@ -108,7 +126,7 @@ public class IndexOperationStatusResponse implements Message { switch (reader.state()) { case 0: - opId = reader.readUuid("opId"); + sndNodeId = reader.readUuid("sndNodeId"); if (!reader.isLastRead()) return false; @@ -116,6 +134,14 @@ public class IndexOperationStatusResponse implements Message { reader.incrementState(); case 1: + opId = reader.readUuid("opId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: errMsg = reader.readString("errMsg"); if (!reader.isLastRead()) @@ -134,7 +160,7 @@ public class IndexOperationStatusResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 2; + return 3; } /** {@inheritDoc} */
