Added operation handler.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2a3ded Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2a3ded Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2a3ded Branch: refs/heads/ignite-4565-ddl Commit: 1b2a3dedb43f4950e3ac67204b717bc44460d28c Parents: ee37450 Author: devozerov <[email protected]> Authored: Mon Mar 20 15:48:56 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Mar 20 15:48:56 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 11 ++++ .../ddl/IndexAbstractDiscoveryMessage.java | 17 ------- .../ddl/IndexOperationCancellationToken.java | 53 ++++++++++++++++++++ .../query/ddl/IndexOperationHandler.java | 37 +++++++------- 4 files changed, 84 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 7fe83ab..517a30d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation; import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation; import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage; import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken; import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage; import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask; import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask; @@ -461,6 +462,16 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Process index operation. + * + * @param op Operation. + * @param cancelToken Cancel token. + */ + public void processIndexOperation(AbstractIndexOperation op, IndexOperationCancellationToken cancelToken) { + // TODO. + } + + /** * Register cache in indexing SPI. * * @param space Space. http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java index 11d8f93..3de525b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java @@ -41,9 +41,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe /** Whether request must be propagated to exchange worker for final processing. */ private transient boolean exchange; - /** Local cache index state at the moment of message receive. */ - private transient QueryIndexStates idxStates; - /** * Constructor. * @@ -66,20 +63,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe } /** - * @return Index states. - */ - @Nullable public QueryIndexStates indexStates() { - return idxStates; - } - - /** - * @param idxStates Index states. - */ - public void indexStates(QueryIndexStates idxStates) { - this.idxStates = idxStates; - } - - /** * @return Whether request must be propagated to exchange worker for final processing. */ public boolean exchange() { http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java new file mode 100644 index 0000000..e8b2c2b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.ddl; + +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Index operation cancellation token. + */ +public class IndexOperationCancellationToken { + /** Cancel flag. */ + private final AtomicBoolean flag = new AtomicBoolean(); + + /** + * Get cancel state. + * + * @return {@code True} if cancelled. + */ + public boolean isCancelled() { + return flag.get(); + } + + /** + * Do cancel. + * + * @return {@code True} if cancel flag was set by this call. + */ + public boolean cancel() { + return flag.compareAndSet(false, true); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IndexOperationCancellationToken.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java index 6932724..116b613 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java @@ -51,12 +51,12 @@ public class IndexOperationHandler { /** Mutex for concurrent access. */ private final Object mux = new Object(); + /** Cancellation token. */ + private final IndexOperationCancellationToken cancelToken = new IndexOperationCancellationToken(); + /** Init flag. */ private boolean init; - /** Cancel flag. */ - private boolean cancel; - /** Worker. */ private IndexWorker worker; @@ -84,7 +84,7 @@ public class IndexOperationHandler { if (!init) { init = true; - if (!cancel) { + if (!cancelToken.isCancelled()) { worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log); new IgniteThread(worker).start(); @@ -96,29 +96,25 @@ public class IndexOperationHandler { } /** - * @return Worker name. - */ - private String workerName() { - return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName(); - } - - /** * Cancel operation. */ public void cancel() { synchronized (mux) { - if (!cancel) { - cancel = true; - + if (!cancelToken.cancel()) { if (worker != null) worker.cancel(); } - - // TODO } } /** + * @return Worker name. + */ + private String workerName() { + return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName(); + } + + /** * Single-shot index worker responsible for operation execution. */ private class IndexWorker extends GridWorker { @@ -140,7 +136,14 @@ public class IndexOperationHandler { @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { startLatch.countDown(); - // TODO: Do actual create/drop. + try { + qryProc.processIndexOperation(op, cancelToken); + + opFut.onDone(); + } + catch (Exception e) { + opFut.onDone(e); + } } /**
