This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 696fc9ac9e97e1187fb15ab2dcbe3780846e8647 Author: Michael Blow <[email protected]> AuthorDate: Sat Feb 1 22:17:30 2020 -0500 [NO ISSUE][*DB][CLUS] Ensure registering node is authorized Change-Id: I1c47e8e7934b87b758b32e186b613b33d852066d Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4863 Integration-Tests: Jenkins <[email protected]> Tested-by: Michael Blow <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../app/replication/NcLifecycleCoordinator.java | 22 ++++++++++++++++------ .../asterix/runtime/ClusterStateManagerTest.java | 4 ++++ .../common/cluster/IClusterStateManager.java | 2 -- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index 18191d6..0d86cb9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -20,6 +20,7 @@ package org.apache.asterix.app.replication; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -51,8 +52,9 @@ import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.metadata.MetadataManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.control.IGatekeeper; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.Level; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -61,13 +63,16 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { private static final Logger LOGGER = LogManager.getLogger(); protected IClusterStateManager clusterManager; protected volatile String metadataNodeId; - protected Set<String> pendingStartupCompletionNodes = new HashSet<>(); + protected Set<String> pendingStartupCompletionNodes = Collections.synchronizedSet(new HashSet<>()); protected final ICCMessageBroker messageBroker; private final boolean replicationEnabled; + private final IGatekeeper gatekeeper; public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) { this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); this.replicationEnabled = replicationEnabled; + this.gatekeeper = + ((ClusterControllerService) serviceCtx.getControllerService()).getApplication().getGatekeeper(); } @Override @@ -120,7 +125,14 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { - pendingStartupCompletionNodes.remove(msg.getNodeId()); + if (!pendingStartupCompletionNodes.remove(msg.getNodeId())) { + LOGGER.warn("Received unexpected startup completion message from node {}", msg.getNodeId()); + } + if (!gatekeeper.isAuthorized(msg.getNodeId())) { + LOGGER.warn("Node {} lost authorization before startup completed; ignoring registration result", + msg.getNodeId()); + return; + } if (msg.isSuccess()) { clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters()); if (msg.getNodeId().equals(metadataNodeId)) { @@ -128,9 +140,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } clusterManager.refreshState(); } else { - if (LOGGER.isErrorEnabled()) { - LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException()); - } + LOGGER.error("Node {} failed to complete startup", msg.getNodeId(), msg.getException()); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index b7a8c63..9cc295e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.asterix.runtime.utils.BulkTxnIdFactory; import org.apache.asterix.runtime.utils.CcApplicationContext; @@ -211,10 +212,13 @@ public class ClusterStateManagerTest { final ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class); JobIdFactory jobIdFactory = new JobIdFactory(CcId.valueOf(0)); Mockito.when(ccs.getJobIdFactory()).thenReturn(jobIdFactory); + final CCApplication ccApplication = Mockito.mock(CCApplication.class); + Mockito.when(ccs.getApplication()).thenReturn(ccApplication); Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig); Mockito.when(iccServiceContext.getControllerService()).thenReturn(ccs); Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext); + Mockito.when(ccApplication.getGatekeeper()).thenReturn(nodeId -> true); NcLifecycleCoordinator coordinator = new NcLifecycleCoordinator(ccApplicationContext.getServiceContext(), false); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 6c39372..a37e6e4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -120,7 +120,6 @@ public interface IClusterStateManager { /** * Register the specified node partitions with the specified nodeId with this cluster state manager - * then calls {@link IClusterStateManager#refreshState()} * * @param nodeId * @param nodePartitions @@ -130,7 +129,6 @@ public interface IClusterStateManager { /** * De-register the specified node's partitions from this cluster state manager - * then calls {@link IClusterStateManager#refreshState()} * * @param nodeId * @throws HyracksDataException
