Repository: ignite Updated Branches: refs/heads/ignite-4565-ddl 2b7c1a2c9 -> 1b2a3dedb
WIP on operation handler. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee37450a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee37450a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee37450a Branch: refs/heads/ignite-4565-ddl Commit: ee37450a69634f29586b5f7e897c6a41322a602f Parents: 2b7c1a2 Author: devozerov <[email protected]> Authored: Mon Mar 20 15:34:35 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Mar 20 15:34:35 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 76 +++++---- .../query/ddl/IndexOperationHandler.java | 161 +++++++++++++++++++ 2 files changed, 197 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 1bb3c1c..7fe83ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -222,8 +222,17 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (initIdxStates != null) { Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations(); - for (QueryTypeCandidate cand : cands) - applyReadyDynamicOperations(cand.descriptor(), readyIdxStates); + for (QueryTypeCandidate cand : cands) { + QueryTypeDescriptorImpl desc = cand.descriptor(); + + for (Map.Entry<String, QueryIndexState> entry : readyIdxStates.entrySet()) { + String idxName = entry.getKey(); + QueryIndexState idxState = entry.getValue(); + + if (F.eq(desc.tableName(), idxState.tableName())) + QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc); + } + } } // Ready to register at this point. @@ -245,19 +254,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Apply ready dynamic index states to not-yet-registered descriptor. + * Find current coordinator. * - * @param desc Descriptor. - * @param idxStates Index states. + * @return {@code True} if node is coordinator. */ - private void applyReadyDynamicOperations(QueryTypeDescriptorImpl desc, Map<String, QueryIndexState> idxStates) - throws IgniteCheckedException { - for (Map.Entry<String, QueryIndexState> entry : idxStates.entrySet()) { - String idxName = entry.getKey(); - QueryIndexState idxState = entry.getValue(); + private ClusterNode findCoordinator() { + ClusterNode res = null; - QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc); + for (ClusterNode node : ctx.discovery().aliveServerNodes()) { + if (res == null || res.order() > node.order()) + res = node; } + + return res; } /** {@inheritDoc} */ @@ -308,6 +317,10 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker. + * When called for the first time, we initialize topology thus understanding whether current node is coordinator + * or not. + * * @param cctx Cache context. * @param idxStates Index states. * @throws IgniteCheckedException If failed. @@ -353,33 +366,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) { idxWorker.onAccept(msg); - } - - /** - * Handle index accept message. - * - * @param msg Message. - */ - public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { - idxWorker.onFinish(msg); - } - - /** - * Handle node leave. - * - * @param node Node. - */ - public void onNodeLeave(ClusterNode node) { - // TODO. - } - /** - * Handle index init discovery message. - * - * @param space Space. - * @param op Operation. - */ - public void onIndexAccept(String space, AbstractIndexOperation op) { idxLock.writeLock().lock(); // TODO @@ -456,12 +443,21 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Handle index ack discovery message. + * Handle index accept message. * * @param msg Message. */ - private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg) { - // TODO + public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { + idxWorker.onFinish(msg); + } + + /** + * Handle node leave. + * + * @param node Node. + */ + public void onNodeLeave(ClusterNode node) { + // TODO. } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java new file mode 100644 index 0000000..6932724 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.ddl; + +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.CountDownLatch; + +/** + * Index change handler. + */ +public class IndexOperationHandler { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Query processor */ + private final GridQueryProcessor qryProc; + + /** Logger. */ + private final IgniteLogger log; + + /** Target operation. */ + private final AbstractIndexOperation op; + + /** Operation future. */ + private final GridFutureAdapter opFut; + + /** Mutex for concurrent access. */ + private final Object mux = new Object(); + + /** Init flag. */ + private boolean init; + + /** Cancel flag. */ + private boolean cancel; + + /** Worker. */ + private IndexWorker worker; + + /** + * Constructor. + * + * @param ctx Context. + * @param qryProc Query processor. + * @param op Target operation. + */ + public IndexOperationHandler(GridKernalContext ctx, GridQueryProcessor qryProc, AbstractIndexOperation op) { + this.ctx = ctx; + this.qryProc = qryProc; + this.op = op; + + log = ctx.log(IndexOperationHandler.class); + opFut = new GridFutureAdapter(); + } + + /** + * Perform initialization routine. + */ + public void init() { + synchronized (mux) { + if (!init) { + init = true; + + if (!cancel) { + worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log); + + new IgniteThread(worker).start(); + + worker.awaitStart(); + } + } + } + } + + /** + * @return Worker name. + */ + private String workerName() { + return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName(); + } + + /** + * Cancel operation. + */ + public void cancel() { + synchronized (mux) { + if (!cancel) { + cancel = true; + + if (worker != null) + worker.cancel(); + } + + // TODO + } + } + + /** + * Single-shot index worker responsible for operation execution. + */ + private class IndexWorker extends GridWorker { + /** Worker start latch. */ + private final CountDownLatch startLatch = new CountDownLatch(1); + + /** + * Constructor. + * + * @param igniteInstanceName Ignite instance name. + * @param name Worker name. + * @param log Logger. + */ + public IndexWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) { + super(igniteInstanceName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + startLatch.countDown(); + + // TODO: Do actual create/drop. + } + + /** + * Await start. + */ + private void awaitStart() { + try { + startLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException("Interrupted while waiting index operation worker start: " + + name(), e); + } + } + } +}
