Implemented operation state management.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41a7beac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41a7beac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41a7beac Branch: refs/heads/ignite-4565-ddl Commit: 41a7beac9d470d16e4aa39bd345c5ce02adbed59 Parents: c71fb3d Author: devozerov <[email protected]> Authored: Tue Mar 21 18:36:40 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Mar 21 18:36:40 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 49 ++-- .../query/ddl/IndexOperationHandler.java | 7 + .../query/ddl/IndexOperationState.java | 251 +++++++++++++++++++ 3 files changed, 289 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 9d98115..92bc7c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -316,8 +316,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { } // Apply dynamic changes to candidates. + Collection<AbstractIndexOperation> ops = new ArrayList<>(); + if (initIdxStates != null) { Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations(); + Map<String, QueryIndexActiveOperation> acceptedOps = initIdxStates.acceptedActiveOperations(); for (QueryTypeCandidate cand : cands) { QueryTypeDescriptorImpl desc = cand.descriptor(); @@ -329,14 +332,29 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (F.eq(desc.tableName(), idxState.tableName())) QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc); } + + for (Map.Entry<String, QueryIndexActiveOperation> acceptedOpEntry : acceptedOps.entrySet()) { + String idxName = acceptedOpEntry.getKey(); + AbstractIndexOperation op = acceptedOpEntry.getValue().operation(); + + if (F.eq(desc.tableName(), op.tableName())) { + QueryIndex idx = op instanceof CreateIndexOperation ? ((CreateIndexOperation)op).index() : null; + + QueryUtils.processDynamicIndexChange(idxName, idx, desc); + } + + ops.add(op); + } } } - // TODO: Apply pending operations right away! - // Ready to register at this point. registerCache0(space, cctx, cands); + // If cache was registered successfully, start pending operations. + for (AbstractIndexOperation op : ops) + startIndexOperation(op, true, null); + // Warn about possible implicit deserialization. if (!mustDeserializeClss.isEmpty()) { U.warn(log, "Some classes in query configuration cannot be written in binary format " + @@ -348,22 +366,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } } - /** - * Find current coordinator. - * - * @return {@code True} if node is coordinator. - */ - private ClusterNode findCoordinator() { - ClusterNode res = null; - - for (ClusterNode node : ctx.discovery().aliveServerNodes()) { - if (res == null || res.order() > node.order()) - res = node; - } - - return res; - } - /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { if (idx != null) @@ -1308,6 +1310,17 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Start index operation. + * + * @param op Operation. + * @param completed Completed flag. + * @param err Error. + */ + private void startIndexOperation(AbstractIndexOperation op, boolean completed, Exception err) { + // TODO + } + + /** * Process status request. * * @param req Status request. http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java index 295a0022..7825877 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java @@ -123,6 +123,13 @@ public class IndexOperationHandler { } /** + * @return Operation. + */ + public AbstractIndexOperation operation() { + return op; + } + + /** * @return Future completed when operation is ready. */ public IgniteInternalFuture future() { http://git-wip-us.apache.org/repos/asf/ignite/blob/41a7beac/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java new file mode 100644 index 0000000..75ab2e7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.ddl; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.UUID; + +import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** + * Current index operation state. + */ +public class IndexOperationState { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Query processor. */ + private final GridQueryProcessor qryProc; + + /** Logger. */ + private final IgniteLogger log; + + /** Operation handler. */ + private final IndexOperationHandler hnd; + + /** Mutex for concurrency control. */ + private final Object mux = new Object(); + + /** Whether node is coordinator. */ + private boolean crd; + + /** Participants. */ + private Collection<UUID> nodeIds; + + /** Node results. */ + private Map<UUID, String> nodeRess; + + /** + * Constructor. + * + * @param ctx Context. + * @param qryProc Query processor. + * @param hnd Operation handler. + */ + public IndexOperationState(GridKernalContext ctx, GridQueryProcessor qryProc, IndexOperationHandler hnd) { + this.ctx = ctx; + + log = ctx.log(IndexOperationState.class); + + this.qryProc = qryProc; + this.hnd = hnd; + } + + /** + * Map operation handling. + */ + @SuppressWarnings("unchecked") + public void tryMap() { + synchronized (mux) { + if (isLocalCoordinator()) { + // Initialize local structure. + crd = true; + nodeIds = new HashSet<>(); + nodeRess = new HashMap<>(); + + // Send remote requests. + IndexOperationStatusRequest req = + new IndexOperationStatusRequest(ctx.localNodeId(), hnd.operation().operationId()); + + for (ClusterNode alive : ctx.discovery().aliveServerNodes()) { + nodeIds.add(alive.id()); + + if (!alive.isLocal()) { + try { + // TODO: Proper pool! + ctx.io().sendToGridTopic(alive, TOPIC_DYNAMIC_SCHEMA, req, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + // Node has left the grid. + nodeIds.remove(alive.id()); + } + } + } + + // Listen for local completion. + hnd.future().listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); + + onNodeFinished(ctx.localNodeId(), null); + } + catch (Exception e) { + onNodeFinished(ctx.localNodeId(), e.getMessage()); + } + } + }); + } + } + } + + /** + * Handle node finish. + * + * @param nodeId Node ID. + * @param errMsg Error message. + */ + public void onNodeFinished(UUID nodeId, String errMsg) { + synchronized (mux) { + if (nodeRess.containsKey(nodeId)) + return; + + nodeRess.put(nodeId, errMsg); + + checkFinished(); + } + } + + /** + * Handle node leave event. + * + * @param nodeId Node ID. + */ + public void onNodeLeave(UUID nodeId) { + synchronized (mux) { + if (crd) { + // Handle this as success. + if (nodeIds.remove(nodeId)) + nodeRess.remove(nodeId); + + checkFinished(); + } + else + // We can become coordinator, so try remap. + tryMap(); + } + } + + /** + * Handle status request. + * + * @param nodeId Node ID. + */ + @SuppressWarnings("unchecked") + public void onStatusRequest(final UUID nodeId) { + hnd.future().listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + String errMsg = null; + + try { + fut.get(); + } + catch (Exception e) { + errMsg = e.getMessage(); + } + + try { + IndexOperationStatusResponse resp = + new IndexOperationStatusResponse(ctx.localNodeId(), hnd.operation().operationId(), errMsg); + + // TODO: Proper pool! + ctx.io().sendToGridTopic(nodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + // Node left, ignore. + // TODO: Better logging all over the state and handler to simplify debug! + } + } + }); + } + + /** + * Find current coordinator. + * + * @return {@code True} if node is coordinator. + */ + private boolean isLocalCoordinator() { + ClusterNode res = null; + + for (ClusterNode node : ctx.discovery().aliveServerNodes()) { + if (res == null || res.order() > node.order()) + res = node; + } + + assert res != null; // Operation state can only exist on server nodes. + + return F.eq(ctx.localNodeId(), res.id()); + } + + /** + * Check if operation finished. + */ + private void checkFinished() { + assert Thread.holdsLock(mux); + assert crd; + + if (nodeIds.size() == nodeRess.size()) { + // Initiate finish request. + UUID errNodeId = null; + String errNodeMsg = null; + + for (Map.Entry<UUID, String> nodeRes : nodeRess.entrySet()) { + if (nodeRes.getValue() != null) { + errNodeId = nodeRes.getKey(); + errNodeMsg = nodeRes.getValue(); + + break; + } + } + + IndexFinishDiscoveryMessage msg = new IndexFinishDiscoveryMessage(hnd.operation(), errNodeId, errNodeMsg); + + try { + ctx.discovery().sendCustomEvent(msg); + } + catch (Exception e) { + // Failed to send finish message over discovery. This is something unrecoverable. + U.warn(log, "Failed to send index finish discovery message [op=" + hnd.operation() + ']', e); + } + } + } +} \ No newline at end of file
