Repository: asterixdb Updated Branches: refs/heads/master f94f63d3f -> e0d8e5078
[ASTERIXDB-2019][CLUS] Update cluster state on partitions changes - user model changes: no - storage format changes: no - interface changes: no Details: - Set the cluster to UNUSABLE when no partitions are registered - Update cluster state after partitions register/de-register - Reject unregistered nodes queries on CC - Avoid NPE when trying to send to a node that was de-registered Change-Id: I7d11733a1dcd86136e157d80517bff4abcfc776b Reviewed-on: https://asterix-gerrit.ics.uci.edu/1918 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e0d8e507 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e0d8e507 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e0d8e507 Branch: refs/heads/master Commit: e0d8e5078f90823e8dd51052317a7da1c08cc9f9 Parents: f94f63d Author: Murtadha Hubail <[email protected]> Authored: Fri Aug 4 19:55:49 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Fri Aug 4 15:31:54 2017 -0700 ---------------------------------------------------------------------- .../message/ExecuteStatementRequestMessage.java | 33 +++++++++++++++----- .../asterix/messaging/CCMessageBroker.java | 8 ++++- .../common/cluster/IClusterStateManager.java | 11 ++++++- .../runtime/utils/ClusterStateManager.java | 15 ++++++++- 4 files changed, 57 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index fc0c1ff..9faa9e9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -92,6 +92,11 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService(); CCApplication ccApp = (CCApplication) ccSrv.getApplication(); CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker(); + final String rejectionReason = getRejectionReason(ccSrv); + if (rejectionReason != null) { + sendRejection(rejectionReason, messageBroker); + return; + } CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager(); ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang); IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); @@ -100,16 +105,9 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage ccSrv.getExecutor().submit(() -> { ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); - try { - final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); - if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { - throw new IllegalStateException("Cannot execute request, cluster is " + clusterState); - } - IParser parser = compilationProvider.getParserFactory().createParser(statementsText); List<Statement> statements = parser.parse(); - StringWriter outWriter = new StringWriter(256); PrintWriter outPrinter = new PrintWriter(outWriter); SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); @@ -148,6 +146,27 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage }); } + private String getRejectionReason(ClusterControllerService ccSrv) { + if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) { + return "Node is not registerted with the CC"; + } + final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { + return "Cannot execute request, cluster is " + clusterState; + } + return null; + } + + private void sendRejection(String reason, CCMessageBroker messageBroker) { + ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + responseMsg.setError(new Exception(reason)); + try { + messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e.toString(), e); + } + } + @Override public String toString() { return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index de2ca11..0eade41 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -69,7 +69,13 @@ public class CCMessageBroker implements ICCMessageBroker { public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception { INodeManager nodeManager = ccs.getNodeManager(); NodeControllerState state = nodeManager.getNodeControllerState(nodeId); - state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); + if (state != null) { + state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); + } else { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("Couldn't send message to unregistered node (" + nodeId + ")"); + } + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index a5686fd..30675cd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -100,11 +100,20 @@ public interface IClusterStateManager { /** * Register the specified node partitions with the specified nodeId with this cluster state manager + * then calls {@link IClusterStateManager#refreshState()} + * + * @param nodeId + * @param nodePartitions + * @throws AsterixException */ void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws AsterixException; /** * De-register the specified node's partitions from this cluster state manager + * then calls {@link IClusterStateManager#refreshState()} + * + * @param nodeId + * @throws HyracksDataException */ - void deregisterNodePartitions(String nodeId); + void deregisterNodePartitions(String nodeId) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 4717a7b..8156a23 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -154,6 +154,12 @@ public class ClusterStateManager implements IClusterStateManager { @Override public synchronized void refreshState() throws HyracksDataException { resetClusterPartitionConstraint(); + if (clusterPartitions.isEmpty()) { + LOGGER.info("Cluster does not have any registered partitions"); + setState(ClusterState.UNUSABLE); + return; + } + for (ClusterPartition p : clusterPartitions.values()) { if (!p.isActive()) { setState(ClusterState.UNUSABLE); @@ -368,10 +374,16 @@ public class ClusterStateManager implements IClusterStateManager { clusterPartitions.put(nodePartition.getPartitionId(), nodePartition); } node2PartitionsMap.put(nodeId, nodePartitions); + //TODO fix exception propagation from refreshState + try { + refreshState(); + } catch (HyracksDataException e) { + throw new AsterixException(e); + } } @Override - public synchronized void deregisterNodePartitions(String nodeId) { + public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException { ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId); if (nodePartitions == null) { LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)"); @@ -382,6 +394,7 @@ public class ClusterStateManager implements IClusterStateManager { for (ClusterPartition nodePartition : nodePartitions) { clusterPartitions.remove(nodePartition.getPartitionId()); } + refreshState(); } }
