Wired things up.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/edaf29af Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/edaf29af Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/edaf29af Branch: refs/heads/ignite-4565-ddl Commit: edaf29af95e1503d95001d1bede5a3d52d120ca5 Parents: 013faff Author: devozerov <[email protected]> Authored: Wed Mar 22 11:30:55 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Mar 22 11:30:55 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 223 +++++++++++++------ .../query/ddl/IndexOperationState.java | 12 +- 2 files changed, 153 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/edaf29af/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 92bc7c1..2e5ee50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -47,9 +47,12 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation; import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation; +import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation; import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage; import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationHandler; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationState; import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusRequest; import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusResponse; import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage; @@ -89,6 +92,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA; import static org.apache.ignite.internal.IgniteComponentType.INDEXING; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; /** * Indexing processor. @@ -127,6 +131,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** Index create/drop client futures. */ private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new ConcurrentHashMap<>(); + /** Index operation states. */ + private final ConcurrentHashMap<UUID, IndexOperationState> idxOpStates = new ConcurrentHashMap<>(); + /** IO message listener. */ private final GridMessageListener ioLsnr; @@ -425,73 +432,90 @@ public class GridQueryProcessor extends GridProcessorAdapter { public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) { idxLock.writeLock().lock(); - // TODO - try { -// // Validate. -// if (op instanceof CreateIndexOperation) { -// CreateIndexOperation op0 = (CreateIndexOperation)op; -// -// QueryIndex idx = op0.index(); -// -// // Check conflict with other indexes. -// String idxName = op0.index().getName(); -// -// QueryIndexKey idxKey = new QueryIndexKey(space, idxName); -// -// QueryIndexDescriptorImpl oldIdx = idxs.get(idxKey); -// -// if (oldIdx != null) { -// if (!op0.ifNotExists()) -// msg.onError(ctx.localNodeId(), "Index already exists [space=" + space + ", index=" + idxName); -// -// return; -// } -// -// // Make sure table exists. -// String tblName = op0.tableName(); -// -// QueryTypeDescriptorImpl typeDesc = null; -// -// for (QueryTypeDescriptorImpl type : types.values()) { -// if (F.eq(tblName, type.tableName())) { -// typeDesc = type; -// -// break; -// } -// } -// -// if (typeDesc == null) { -// msg.onError(ctx.localNodeId(), "Table doesn't exist: " + tblName); -// -// return; -// } -// -// // Make sure that index can be applied to the given table. -// for (String idxField : idx.getFieldNames()) { -// if (!typeDesc.fields().containsKey(idxField)) { -// msg.onError(ctx.localNodeId(), "Field doesn't exist: " + idxField); -// -// return; -// } -// } -// } -// else if (op instanceof DropIndexOperation) { -// DropIndexOperation op0 = (DropIndexOperation)op; -// -// String idxName = op0.indexName(); -// -// QueryIndexKey idxKey = new QueryIndexKey(space, idxName); -// -// QueryIndexDescriptorImpl oldIdx = idxs.get(idxKey); -// -// if (oldIdx == null) { -// if (!op0.ifExists()) -// msg.onError(ctx.localNodeId(), "Index doesn't exist: " + idxName); -// } -// } -// else -// msg.onError(ctx.localNodeId(), "Unsupported operation: " + op); + AbstractIndexOperation op = msg.operation(); + String space = op.space(); + + boolean completed = false; + String errMsg = null; + + // Validate. + if (op instanceof CreateIndexOperation) { + CreateIndexOperation op0 = (CreateIndexOperation)op; + + QueryIndex idx = op0.index(); + + // Make sure table exists. + String tblName = op0.tableName(); + + QueryTypeDescriptorImpl type0 = null; + + for (QueryTypeDescriptorImpl type : types.values()) { + if (F.eq(tblName, type.tableName())) { + type0 = type; + + break; + } + } + + if (type0 == null) { + completed = true; + errMsg = "Table doesn't exist: " + tblName; + } + else { + // Make sure that index can be applied to the given table. + for (String idxField : idx.getFieldNames()) { + if (!type0.fields().containsKey(idxField)) { + completed = true; + errMsg = "Field doesn't exist: " + idxField; + } + } + } + + // Check conflict with other indexes. + if (errMsg != null) { + String idxName = op0.index().getName(); + + QueryIndexKey idxKey = new QueryIndexKey(space, idxName); + + if (idxs.get(idxKey) != null) { + completed = true; + + if (!op0.ifNotExists()) + errMsg = "Index already exists [space=" + space + ", index=" + idxName + ']'; + } + } + } + else if (op instanceof DropIndexOperation) { + DropIndexOperation op0 = (DropIndexOperation)op; + + String idxName = op0.indexName(); + + QueryIndexDescriptorImpl oldIdx = idxs.get(new QueryIndexKey(space, idxName)); + + if (oldIdx == null) { + completed = true; + + if (!op0.ifExists()) + errMsg = "Index doesn't exist: " + idxName; + } + else { + // Make sure that index relate to expected table. + if (F.eq(oldIdx.typeDescriptor().tableName(), op.tableName())) { + completed = true; + errMsg = "Index doesn't exist: " + idxName; + } + } + } + else { + completed = true; + errMsg = "Unsupported operation: " + op; + } + + // Start async operation. + Exception err = errMsg != null ? new IgniteException(errMsg) : null; + + startIndexOperation(op, completed, err); } finally { idxLock.writeLock().unlock(); @@ -499,12 +523,26 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Handle index accept message. + * Handle index finish message. * * @param msg Message. */ public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { + UUID opId = msg.operation().operationId(); + + idxOpStates.remove(opId); + + QueryIndexClientFuture cliFut = idxCliFuts.remove(opId); + + if (cliFut != null) { + if (msg.hasError()) { + IgniteException err = new IgniteException(msg.errorMessage()); + cliFut.onDone(err); // TODO: Better message and code handling. + } + else + cliFut.onDone(); + } } /** @@ -513,7 +551,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param node Node. */ public void onNodeLeave(ClusterNode node) { - // TODO. + for (IndexOperationState idxOpState : idxOpStates.values()) + idxOpState.onNodeLeave(node.id()); } /** @@ -1317,7 +1356,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param err Error. */ private void startIndexOperation(AbstractIndexOperation op, boolean completed, Exception err) { - // TODO + IndexOperationHandler hnd = new IndexOperationHandler(ctx, this, op, completed, err); + + hnd.init(); + + IndexOperationState state = new IndexOperationState(ctx, this, hnd); + + idxOpStates.put(op.operationId(), state); + + state.tryMap(); } /** @@ -1326,7 +1373,35 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param req Status request. */ private void processStatusRequest(IndexOperationStatusRequest req) { - // TODO + UUID opId = req.operationId(); + + IndexOperationState idxOpState = idxOpStates.get(opId); + + if (idxOpState != null) + idxOpState.onStatusRequest(req.senderNodeId()); + else + // Operation completed successfully. + sendStatusResponse(req.senderNodeId(), opId, null); + } + + /** + * Send status response. + * + * @param destNodeId Destination node ID. + * @param opId Operation ID. + * @param errMsg Error message. + */ + public void sendStatusResponse(UUID destNodeId, UUID opId, String errMsg) { + try { + IndexOperationStatusResponse resp = new IndexOperationStatusResponse(ctx.localNodeId(), opId, errMsg); + + // TODO: Proper pool! + ctx.io().sendToGridTopic(destNodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + // Node left, ignore. + // TODO: Better logging all over the state and handler to simplify debug! + } } /** @@ -1335,7 +1410,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param resp Status response. */ private void processStatusResponse(IndexOperationStatusResponse resp) { - // TODO + IndexOperationState idxOpState = idxOpStates.get(resp.operationId()); + + if (idxOpState != null) + idxOpState.onNodeFinished(resp.senderNodeId(), resp.errorMessage()); + else { + // TODO: Log! + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/edaf29af/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java index 75ab2e7..88ba5cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java @@ -183,17 +183,7 @@ public class IndexOperationState { errMsg = e.getMessage(); } - try { - IndexOperationStatusResponse resp = - new IndexOperationStatusResponse(ctx.localNodeId(), hnd.operation().operationId(), errMsg); - - // TODO: Proper pool! - ctx.io().sendToGridTopic(nodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - // Node left, ignore. - // TODO: Better logging all over the state and handler to simplify debug! - } + qryProc.sendStatusResponse(nodeId, hnd.operation().operationId(), errMsg); } }); }
