http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java deleted file mode 100644 index c79524a..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java +++ /dev/null @@ -1,679 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.util; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.event.schema.cluster.Cluster; -import org.apache.asterix.event.schema.cluster.Node; -import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage; -import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage; -import org.apache.asterix.runtime.message.NodeFailbackPlan; -import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState; -import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage; -import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage; -import org.apache.asterix.runtime.message.ReplicaEventMessage; -import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage; -import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage; -import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage; -import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; - -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * A holder class for properties related to the Asterix cluster. - */ - -public class ClusterStateManager { - /* - * TODO: currently after instance restarts we require all nodes to join again, - * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance - * shutdown and using it on startup to identify the nodes that are expected the join. - */ - - private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName()); - public static final ClusterStateManager INSTANCE = new ClusterStateManager(); - private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address"; - private static final String IO_DEVICES = "iodevices"; - private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>(); - - private final Cluster cluster; - private ClusterState state = ClusterState.UNUSABLE; - - private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint; - - private boolean globalRecoveryCompleted = false; - - private Map<String, ClusterPartition[]> node2PartitionsMap = null; - private SortedMap<Integer, ClusterPartition> clusterPartitions = null; - private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null; - - private long clusterRequestId = 0; - private String currentMetadataNode = null; - private boolean metadataNodeActive = false; - private boolean autoFailover = false; - private boolean replicationEnabled = false; - private Set<String> failedNodes = new HashSet<>(); - private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans; - private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap; - - private ClusterStateManager() { - cluster = ClusterProperties.INSTANCE.getCluster(); - // if this is the CC process - if (AppContextInfo.INSTANCE.initialized() - && AppContextInfo.INSTANCE.getCCApplicationContext() != null) { - node2PartitionsMap = AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions(); - clusterPartitions = AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions(); - currentMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName(); - replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); - autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled(); - if (autoFailover) { - pendingTakeoverRequests = new HashMap<>(); - pendingProcessingFailbackPlans = new LinkedList<>(); - planId2FailbackPlanMap = new HashMap<>(); - } - } - } - - public synchronized void removeNCConfiguration(String nodeId) throws HyracksException { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Removing configuration parameters for node id " + nodeId); - } - activeNcConfiguration.remove(nodeId); - - //if this node was waiting for failback and failed before it completed - if (failedNodes.contains(nodeId)) { - if (autoFailover) { - notifyFailbackPlansNodeFailure(nodeId); - revertFailedFailbackPlanEffects(); - } - } else { - //an active node failed - failedNodes.add(nodeId); - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = false; - LOGGER.info("Metadata node is now inactive"); - } - updateNodePartitions(nodeId, false); - if (replicationEnabled) { - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE); - if (autoFailover) { - notifyFailbackPlansNodeFailure(nodeId); - requestPartitionsTakeover(nodeId); - } - } - } - } - - public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) - throws HyracksException { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Registering configuration parameters for node id " + nodeId); - } - activeNcConfiguration.put(nodeId, configuration); - - //a node trying to come back after failure - if (failedNodes.contains(nodeId)) { - if (autoFailover) { - prepareFailbackPlan(nodeId); - return; - } else { - //a node completed local or remote recovery and rejoined - failedNodes.remove(nodeId); - if (replicationEnabled) { - //notify other replica to reconnect to this node - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); - } - } - } - - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = true; - LOGGER.info("Metadata node is now active"); - } - updateNodePartitions(nodeId, true); - } - - private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException { - ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); - // if this isn't a storage node, it will not have cluster partitions - if (nodePartitions != null) { - for (ClusterPartition p : nodePartitions) { - // set the active node for this node's partitions - p.setActive(added); - if (added) { - p.setActiveNodeId(nodeId); - } - } - resetClusterPartitionConstraint(); - updateClusterState(); - } - } - - private synchronized void updateClusterState() throws HyracksDataException { - for (ClusterPartition p : clusterPartitions.values()) { - if (!p.isActive()) { - state = ClusterState.UNUSABLE; - LOGGER.info("Cluster is in UNUSABLE state"); - return; - } - } - // if all storage partitions are active as well as the metadata node, then the cluster is active - if (metadataNodeActive) { - state = ClusterState.PENDING; - LOGGER.info("Cluster is now " + state); - AppContextInfo.INSTANCE.getMetadataBootstrap().init(); - state = ClusterState.ACTIVE; - LOGGER.info("Cluster is now " + state); - // start global recovery - AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery(); - if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) { - processPendingFailbackPlans(); - } - } else { - requestMetadataNodeTakeover(); - } - } - - /** - * Returns the IO devices configured for a Node Controller - * - * @param nodeId - * unique identifier of the Node Controller - * @return a list of IO devices. - */ - public synchronized String[] getIODevices(String nodeId) { - Map<String, String> ncConfig = activeNcConfiguration.get(nodeId); - if (ncConfig == null) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Configuration parameters for nodeId " + nodeId - + " not found. The node has not joined yet or has left."); - } - return new String[0]; - } - return ncConfig.get(IO_DEVICES).split(","); - } - - public ClusterState getState() { - return state; - } - - public synchronized Node getAvailableSubstitutionNode() { - List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode(); - return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0); - } - - public synchronized Set<String> getParticipantNodes() { - Set<String> participantNodes = new HashSet<>(); - for (String pNode : activeNcConfiguration.keySet()) { - participantNodes.add(pNode); - } - return participantNodes; - } - - public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() { - if (clusterPartitionConstraint == null) { - resetClusterPartitionConstraint(); - } - return clusterPartitionConstraint; - } - - private synchronized void resetClusterPartitionConstraint() { - ArrayList<String> clusterActiveLocations = new ArrayList<>(); - for (ClusterPartition p : clusterPartitions.values()) { - if (p.isActive()) { - clusterActiveLocations.add(p.getActiveNodeId()); - } - } - clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint( - clusterActiveLocations.toArray(new String[] {})); - } - - public boolean isGlobalRecoveryCompleted() { - return globalRecoveryCompleted; - } - - public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) { - this.globalRecoveryCompleted = globalRecoveryCompleted; - } - - public boolean isClusterActive() { - if (cluster == null) { - // this is a virtual cluster - return true; - } - return state == ClusterState.ACTIVE; - } - - public static int getNumberOfNodes() { - return AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size(); - } - - public synchronized ClusterPartition[] getNodePartitions(String nodeId) { - return node2PartitionsMap.get(nodeId); - } - - public synchronized int getNodePartitionsCount(String node) { - if (node2PartitionsMap.containsKey(node)) { - return node2PartitionsMap.get(node).length; - } - return 0; - } - - public synchronized ClusterPartition[] getClusterPartitons() { - ArrayList<ClusterPartition> partitons = new ArrayList<>(); - for (ClusterPartition partition : clusterPartitions.values()) { - partitons.add(partition); - } - return partitons.toArray(new ClusterPartition[] {}); - } - - private synchronized void requestPartitionsTakeover(String failedNodeId) { - //replica -> list of partitions to takeover - Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>(); - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - - //collect the partitions of the failed NC - List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId); - if (!lostPartitions.isEmpty()) { - for (ClusterPartition partition : lostPartitions) { - //find replicas for this partitions - Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId()); - //find a replica that is still active - for (String replica : partitionReplicas) { - //TODO (mhubail) currently this assigns the partition to the first found active replica. - //It needs to be modified to consider load balancing. - if (addActiveReplica(replica, partition, partitionRecoveryPlan)) { - break; - } - } - } - - if (partitionRecoveryPlan.size() == 0) { - //no active replicas were found for the failed node - LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions); - return; - } else { - LOGGER.info("Partitions to recover: " + lostPartitions); - } - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - //For each replica, send a request to takeover the assigned partitions - for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) { - String replica = entry.getKey(); - Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]); - long requestId = clusterRequestId++; - TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, - replica, partitionsToTakeover); - pendingTakeoverRequests.put(requestId, takeoverRequest); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, replica); - } catch (Exception e) { - /** - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, we will send any pending request - * that belongs to the failed NC to a different active replica. - */ - LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e); - } - } - } - } - - private boolean addActiveReplica(String replica, ClusterPartition partition, - Map<String, List<Integer>> partitionRecoveryPlan) { - if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) { - if (!partitionRecoveryPlan.containsKey(replica)) { - List<Integer> replicaPartitions = new ArrayList<>(); - replicaPartitions.add(partition.getPartitionId()); - partitionRecoveryPlan.put(replica, replicaPartitions); - } else { - partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); - } - return true; - } - return false; - } - - private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) { - List<ClusterPartition> nodePartitions = new ArrayList<>(); - for (ClusterPartition partition : clusterPartitions.values()) { - if (partition.getActiveNodeId().equals(nodeId)) { - nodePartitions.add(partition); - } - } - /** - * if there is any pending takeover request that this node was supposed to handle, - * it needs to be sent to a different replica - */ - List<Long> failedTakeoverRequests = new ArrayList<>(); - for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) { - if (request.getNodeId().equals(nodeId)) { - for (Integer partitionId : request.getPartitions()) { - nodePartitions.add(clusterPartitions.get(partitionId)); - } - failedTakeoverRequests.add(request.getRequestId()); - } - } - - //remove failed requests - for (Long requestId : failedTakeoverRequests) { - pendingTakeoverRequests.remove(requestId); - } - return nodePartitions; - } - - private synchronized void requestMetadataNodeTakeover() { - //need a new node to takeover metadata node - ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties() - .getMetadataPartition(); - //request the metadataPartition node to register itself as the metadata node - TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage(); - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId()); - } catch (Exception e) { - /** - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, a new NC will be assigned to - * the metadata partition and a new metadata node takeover request will be sent to it. - */ - LOGGER.log(Level.WARNING, - "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e); - } - } - - public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response) - throws HyracksDataException { - for (Integer partitonId : response.getPartitions()) { - ClusterPartition partition = clusterPartitions.get(partitonId); - partition.setActive(true); - partition.setActiveNodeId(response.getNodeId()); - } - pendingTakeoverRequests.remove(response.getRequestId()); - resetClusterPartitionConstraint(); - updateClusterState(); - } - - public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response) - throws HyracksDataException { - currentMetadataNode = response.getNodeId(); - metadataNodeActive = true; - LOGGER.info("Current metadata node: " + currentMetadataNode); - updateClusterState(); - } - - private synchronized void prepareFailbackPlan(String failingBackNodeId) { - NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId); - pendingProcessingFailbackPlans.add(plan); - planId2FailbackPlanMap.put(plan.getPlanId(), plan); - - //get all partitions this node requires to resync - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId); - for (String replicaId : nodeReplicas) { - ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId); - for (ClusterPartition partition : nodePartitions) { - plan.addParticipant(partition.getActiveNodeId()); - /** - * if the partition original node is the returning node, - * add it to the list of the partitions which will be failed back - */ - if (partition.getNodeId().equals(failingBackNodeId)) { - plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId()); - } - } - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Prepared Failback plan: " + plan.toString()); - } - - processPendingFailbackPlans(); - } - - private synchronized void processPendingFailbackPlans() { - /** - * if the cluster state is not ACTIVE, then failbacks should not be processed - * since some partitions are not active - */ - if (state == ClusterState.ACTIVE) { - while (!pendingProcessingFailbackPlans.isEmpty()) { - //take the first pending failback plan - NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop(); - /** - * A plan at this stage will be in one of two states: - * 1. PREPARING -> the participants were selected but we haven't sent any request. - * 2. PENDING_ROLLBACK -> a participant failed before we send any requests - */ - if (plan.getState() == FailbackPlanState.PREPARING) { - //set the partitions that will be failed back as inactive - String failbackNode = plan.getNodeId(); - for (Integer partitionId : plan.getPartitionsToFailback()) { - ClusterPartition clusterPartition = clusterPartitions.get(partitionId); - clusterPartition.setActive(false); - //partition expected to be returned to the failing back node - clusterPartition.setActiveNodeId(failbackNode); - } - - /** - * if the returning node is the original metadata node, - * then metadata node will change after the failback completes - */ - String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties() - .getMetadataNodeName(); - if (originalMetadataNode.equals(failbackNode)) { - plan.setNodeToReleaseMetadataManager(currentMetadataNode); - currentMetadataNode = ""; - metadataNodeActive = false; - } - - //force new jobs to wait - state = ClusterState.REBALANCING; - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE - .getCCApplicationContext().getMessageBroker(); - handleFailbackRequests(plan, messageBroker); - /** - * wait until the current plan is completed before processing the next plan. - * when the current one completes or is reverted, the cluster state will be - * ACTIVE again, and the next failback plan (if any) will be processed. - */ - break; - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //this plan failed before sending any requests -> nothing to rollback - planId2FailbackPlanMap.remove(plan.getPlanId()); - } - } - } - } - - private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) { - //send requests to other nodes to complete on-going jobs and prepare partitions for failback - for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) { - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeID()); - plan.addPendingRequest(request); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e); - plan.notifyNodeFailure(request.getNodeID()); - revertFailedFailbackPlanEffects(); - break; - } - } - } - - public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) { - NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId()); - plan.markRequestCompleted(msg.getRequestId()); - /** - * A plan at this stage will be in one of three states: - * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait). - * 2. PENDING_COMPLETION -> all responses received (time to send completion request). - * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert). - */ - if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) { - CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage(); - - //send complete resync and takeover partitions to the failing back node - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeId()); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e); - notifyFailbackPlansNodeFailure(request.getNodeId()); - revertFailedFailbackPlanEffects(); - } - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - revertFailedFailbackPlanEffects(); - } - } - - public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response) - throws HyracksDataException { - /** - * the failback plan completed successfully: - * Remove all references to it. - * Remove the the failing back node from the failed nodes list. - * Notify its replicas to reconnect to it. - * Set the failing back node partitions as active. - */ - NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId()); - String nodeId = plan.getNodeId(); - failedNodes.remove(nodeId); - //notify impacted replicas they can reconnect to this node - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); - updateNodePartitions(nodeId, true); - } - - private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) { - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId); - String nodeIdAddress = ""; - //in case the node joined with a new IP address, we need to send it to the other replicas - if (event == ClusterEventType.NODE_JOIN) { - nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY); - } - - ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - for (String replica : remoteReplicas) { - //if the remote replica is alive, send the event - if (activeNcConfiguration.containsKey(replica)) { - try { - messageBroker.sendApplicationMessageToNC(msg, replica); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e); - } - } - } - } - - private synchronized void revertFailedFailbackPlanEffects() { - Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //TODO if the failing back node is still active, notify it to construct a new plan for it - iterator.remove(); - - //reassign the partitions that were supposed to be failed back to an active replica - requestPartitionsTakeover(plan.getNodeId()); - } - } - } - - private synchronized void notifyFailbackPlansNodeFailure(String nodeId) { - Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - plan.notifyNodeFailure(nodeId); - } - } - - public synchronized boolean isMetadataNodeActive() { - return metadataNodeActive; - } - - public synchronized ObjectNode getClusterStateDescription() { - ObjectMapper om = new ObjectMapper(); - ObjectNode stateDescription = om.createObjectNode(); - stateDescription.put("state", state.name()); - stateDescription.put("metadata_node", currentMetadataNode); - ArrayNode ncs = om.createArrayNode(); - stateDescription.set("ncs",ncs); - for (Map.Entry<String, ClusterPartition[]> entry : node2PartitionsMap.entrySet()) { - ObjectNode nodeJSON = om.createObjectNode(); - nodeJSON.put("node_id", entry.getKey()); - boolean allActive = true; - boolean anyActive = false; - Set<Map<String, Object>> partitions = new HashSet<>(); - for (ClusterPartition part : entry.getValue()) { - HashMap<String, Object> partition = new HashMap<>(); - partition.put("partition_id", "partition_" + part.getPartitionId()); - partition.put("active", part.isActive()); - partitions.add(partition); - allActive = allActive && part.isActive(); - if (allActive) { - anyActive = true; - } - } - nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? "FAILED" - : allActive ? "ACTIVE" - : anyActive ? "PARTIALLY_ACTIVE" - : "INACTIVE"); - nodeJSON.putPOJO("partitions", partitions); - ncs.add(nodeJSON); - } - return stateDescription; - } - - public synchronized ObjectNode getClusterStateSummary() { - ObjectMapper om = new ObjectMapper(); - ObjectNode stateDescription = om.createObjectNode(); - stateDescription.put("state", state.name()); - stateDescription.putPOJO("metadata_node", currentMetadataNode); - stateDescription.putPOJO("partitions", clusterPartitions); - return stateDescription; - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java deleted file mode 100644 index 5b4daa6..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.util; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; -import org.apache.hyracks.storage.common.IStorageManagerInterface; -import org.apache.hyracks.storage.common.buffercache.IBufferCache; -import org.apache.hyracks.storage.common.file.IFileMapProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.IResourceIdFactory; - -public class RuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface, - ILSMIOOperationSchedulerProvider { - - private static final long serialVersionUID = 1L; - - public static final RuntimeComponentsProvider RUNTIME_PROVIDER = new RuntimeComponentsProvider(); - - private RuntimeComponentsProvider() { - } - - @Override - public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getLSMIOScheduler(); - } - - @Override - public IBufferCache getBufferCache(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getBufferCache(); - } - - @Override - public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getFileMapManager(); - } - - @Override - public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getLocalResourceRepository(); - } - - @Override - public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getDatasetLifecycleManager(); - } - - @Override - public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getResourceIdFactory(); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java deleted file mode 100644 index 608def7..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.util; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.cluster.INodeManager; - -/** - * Utility class for obtaining information on the set of Hyracks NodeController - * processes that are running on a given host. - */ -public class RuntimeUtils { - - private RuntimeUtils() { - } - - public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws HyracksDataException { - Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap(); - return nodeControllerInfo.get(ipAddress); - } - - public static List<String> getAllNodeControllers() throws HyracksDataException { - Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values(); - List<String> nodeControllers = new ArrayList<>(); - for (Set<String> ncCollection : nodeControllersCollection) { - nodeControllers.addAll(ncCollection); - } - return nodeControllers; - } - - public static Map<InetAddress, Set<String>> getNodeControllerMap() throws HyracksDataException { - Map<InetAddress, Set<String>> map = new HashMap<>(); - AppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map); - return map; - } - - public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) { - ClusterControllerService ccs = (ClusterControllerService) AppContextInfo.INSTANCE - .getCCApplicationContext().getControllerService(); - INodeManager nodeManager = ccs.getNodeManager(); - map.putAll(nodeManager.getIpAddressNodeNameMap()); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java new file mode 100644 index 0000000..355f503 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.utils; + +import java.io.IOException; +import java.util.function.Supplier; +import java.util.logging.Logger; + +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; +import org.apache.asterix.common.config.BuildProperties; +import org.apache.asterix.common.config.CompilerProperties; +import org.apache.asterix.common.config.ExtensionProperties; +import org.apache.asterix.common.config.ExternalProperties; +import org.apache.asterix.common.config.FeedProperties; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.MessagingProperties; +import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.config.PropertiesAccessor; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.config.TransactionProperties; +import org.apache.asterix.common.dataflow.IApplicationContextInfo; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.metadata.IMetadataBootstrap; +import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.hyracks.api.application.ICCApplicationContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; +import org.apache.hyracks.storage.common.IStorageManager; + +/* + * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager + * instances that are accessed from the NCs. In addition an instance of ICCApplicationContext + * is stored for access by the CC. + */ +public class AppContextInfo implements IApplicationContextInfo, IPropertiesProvider { + + public static final AppContextInfo INSTANCE = new AppContextInfo(); + private ICCApplicationContext appCtx; + private IGlobalRecoveryManager globalRecoveryManager; + private ILibraryManager libraryManager; + private IResourceIdManager resourceIdManager; + private CompilerProperties compilerProperties; + private ExternalProperties externalProperties; + private MetadataProperties metadataProperties; + private StorageProperties storageProperties; + private TransactionProperties txnProperties; + private FeedProperties feedProperties; + private BuildProperties buildProperties; + private ReplicationProperties replicationProperties; + private ExtensionProperties extensionProperties; + private MessagingProperties messagingProperties; + private Supplier<IMetadataBootstrap> metadataBootstrapSupplier; + private IHyracksClientConnection hcc; + private Object extensionManager; + private volatile boolean initialized = false; + + private AppContextInfo() { + } + + public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc, + ILibraryManager libraryManager, IResourceIdManager resourceIdManager, + Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager) + throws AsterixException, IOException { + if (INSTANCE.initialized) { + throw new AsterixException(AppContextInfo.class.getSimpleName() + " has been initialized already"); + } + INSTANCE.initialized = true; + INSTANCE.appCtx = ccAppCtx; + INSTANCE.hcc = hcc; + INSTANCE.libraryManager = libraryManager; + INSTANCE.resourceIdManager = resourceIdManager; + // Determine whether to use old-style asterix-configuration.xml or new-style configuration. + // QQQ strip this out eventually + PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ccAppCtx.getAppConfig()); + INSTANCE.compilerProperties = new CompilerProperties(propertiesAccessor); + INSTANCE.externalProperties = new ExternalProperties(propertiesAccessor); + INSTANCE.metadataProperties = new MetadataProperties(propertiesAccessor); + INSTANCE.storageProperties = new StorageProperties(propertiesAccessor); + INSTANCE.txnProperties = new TransactionProperties(propertiesAccessor); + INSTANCE.feedProperties = new FeedProperties(propertiesAccessor); + INSTANCE.extensionProperties = new ExtensionProperties(propertiesAccessor); + INSTANCE.replicationProperties = new ReplicationProperties(propertiesAccessor); + INSTANCE.hcc = hcc; + INSTANCE.buildProperties = new BuildProperties(propertiesAccessor); + INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor); + INSTANCE.metadataBootstrapSupplier = metadataBootstrapSupplier; + INSTANCE.globalRecoveryManager = globalRecoveryManager; + + Logger.getLogger("org.apache.asterix").setLevel(INSTANCE.externalProperties.getLogLevel()); + Logger.getLogger("org.apache.hyracks").setLevel(INSTANCE.externalProperties.getLogLevel()); + } + + public boolean initialized() { + return initialized; + } + + @Override + public ICCApplicationContext getCCApplicationContext() { + return appCtx; + } + + @Override + public StorageProperties getStorageProperties() { + return storageProperties; + } + + @Override + public TransactionProperties getTransactionProperties() { + return txnProperties; + } + + @Override + public CompilerProperties getCompilerProperties() { + return compilerProperties; + } + + @Override + public MetadataProperties getMetadataProperties() { + return metadataProperties; + } + + @Override + public ExternalProperties getExternalProperties() { + return externalProperties; + } + + @Override + public FeedProperties getFeedProperties() { + return feedProperties; + } + + @Override + public BuildProperties getBuildProperties() { + return buildProperties; + } + + public IHyracksClientConnection getHcc() { + return hcc; + } + + @Override + public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() { + return RuntimeComponentsProvider.RUNTIME_PROVIDER; + } + + @Override + public IStorageManager getStorageManager() { + return RuntimeComponentsProvider.RUNTIME_PROVIDER; + } + + @Override + public ReplicationProperties getReplicationProperties() { + return replicationProperties; + } + + @Override + public IGlobalRecoveryManager getGlobalRecoveryManager() { + return globalRecoveryManager; + } + + @Override + public ILibraryManager getLibraryManager() { + return libraryManager; + } + + public Object getExtensionManager() { + return extensionManager; + } + + public void setExtensionManager(Object extensionManager) { + this.extensionManager = extensionManager; + } + + public ExtensionProperties getExtensionProperties() { + return extensionProperties; + } + + @Override + public MessagingProperties getMessagingProperties() { + return messagingProperties; + } + + public IResourceIdManager getResourceIdManager() { + return resourceIdManager; + } + + public IMetadataBootstrap getMetadataBootstrap() { + return metadataBootstrapSupplier.get(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java new file mode 100644 index 0000000..8fffdb1 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.event.schema.cluster.Cluster; +import org.apache.asterix.event.schema.cluster.Node; +import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage; +import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage; +import org.apache.asterix.runtime.message.NodeFailbackPlan; +import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState; +import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage; +import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage; +import org.apache.asterix.runtime.message.ReplicaEventMessage; +import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage; +import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage; +import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage; +import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * A holder class for properties related to the Asterix cluster. + */ + +public class ClusterStateManager { + /* + * TODO: currently after instance restarts we require all nodes to join again, + * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance + * shutdown and using it on startup to identify the nodes that are expected the join. + */ + + private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName()); + public static final ClusterStateManager INSTANCE = new ClusterStateManager(); + private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address"; + private static final String IO_DEVICES = "iodevices"; + private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>(); + + private final Cluster cluster; + private ClusterState state = ClusterState.UNUSABLE; + + private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint; + + private boolean globalRecoveryCompleted = false; + + private Map<String, ClusterPartition[]> node2PartitionsMap = null; + private SortedMap<Integer, ClusterPartition> clusterPartitions = null; + private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null; + + private long clusterRequestId = 0; + private String currentMetadataNode = null; + private boolean metadataNodeActive = false; + private boolean autoFailover = false; + private boolean replicationEnabled = false; + private Set<String> failedNodes = new HashSet<>(); + private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans; + private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap; + + private ClusterStateManager() { + cluster = ClusterProperties.INSTANCE.getCluster(); + // if this is the CC process + if (AppContextInfo.INSTANCE.initialized() + && AppContextInfo.INSTANCE.getCCApplicationContext() != null) { + node2PartitionsMap = AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions(); + clusterPartitions = AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions(); + currentMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName(); + replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); + autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled(); + if (autoFailover) { + pendingTakeoverRequests = new HashMap<>(); + pendingProcessingFailbackPlans = new LinkedList<>(); + planId2FailbackPlanMap = new HashMap<>(); + } + } + } + + public synchronized void removeNCConfiguration(String nodeId) throws HyracksException { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Removing configuration parameters for node id " + nodeId); + } + activeNcConfiguration.remove(nodeId); + + //if this node was waiting for failback and failed before it completed + if (failedNodes.contains(nodeId)) { + if (autoFailover) { + notifyFailbackPlansNodeFailure(nodeId); + revertFailedFailbackPlanEffects(); + } + } else { + //an active node failed + failedNodes.add(nodeId); + if (nodeId.equals(currentMetadataNode)) { + metadataNodeActive = false; + LOGGER.info("Metadata node is now inactive"); + } + updateNodePartitions(nodeId, false); + if (replicationEnabled) { + notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE); + if (autoFailover) { + notifyFailbackPlansNodeFailure(nodeId); + requestPartitionsTakeover(nodeId); + } + } + } + } + + public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) + throws HyracksException { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Registering configuration parameters for node id " + nodeId); + } + activeNcConfiguration.put(nodeId, configuration); + + //a node trying to come back after failure + if (failedNodes.contains(nodeId)) { + if (autoFailover) { + prepareFailbackPlan(nodeId); + return; + } else { + //a node completed local or remote recovery and rejoined + failedNodes.remove(nodeId); + if (replicationEnabled) { + //notify other replica to reconnect to this node + notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); + } + } + } + + if (nodeId.equals(currentMetadataNode)) { + metadataNodeActive = true; + LOGGER.info("Metadata node is now active"); + } + updateNodePartitions(nodeId, true); + } + + private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException { + ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); + // if this isn't a storage node, it will not have cluster partitions + if (nodePartitions != null) { + for (ClusterPartition p : nodePartitions) { + // set the active node for this node's partitions + p.setActive(added); + if (added) { + p.setActiveNodeId(nodeId); + } + } + resetClusterPartitionConstraint(); + updateClusterState(); + } + } + + private synchronized void updateClusterState() throws HyracksDataException { + for (ClusterPartition p : clusterPartitions.values()) { + if (!p.isActive()) { + state = ClusterState.UNUSABLE; + LOGGER.info("Cluster is in UNUSABLE state"); + return; + } + } + // if all storage partitions are active as well as the metadata node, then the cluster is active + if (metadataNodeActive) { + state = ClusterState.PENDING; + LOGGER.info("Cluster is now " + state); + AppContextInfo.INSTANCE.getMetadataBootstrap().init(); + state = ClusterState.ACTIVE; + LOGGER.info("Cluster is now " + state); + // start global recovery + AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery(); + if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) { + processPendingFailbackPlans(); + } + } else { + requestMetadataNodeTakeover(); + } + } + + /** + * Returns the IO devices configured for a Node Controller + * + * @param nodeId + * unique identifier of the Node Controller + * @return a list of IO devices. + */ + public synchronized String[] getIODevices(String nodeId) { + Map<String, String> ncConfig = activeNcConfiguration.get(nodeId); + if (ncConfig == null) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("Configuration parameters for nodeId " + nodeId + + " not found. The node has not joined yet or has left."); + } + return new String[0]; + } + return ncConfig.get(IO_DEVICES).split(","); + } + + public ClusterState getState() { + return state; + } + + public synchronized Node getAvailableSubstitutionNode() { + List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode(); + return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0); + } + + public synchronized Set<String> getParticipantNodes() { + Set<String> participantNodes = new HashSet<>(); + for (String pNode : activeNcConfiguration.keySet()) { + participantNodes.add(pNode); + } + return participantNodes; + } + + public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() { + if (clusterPartitionConstraint == null) { + resetClusterPartitionConstraint(); + } + return clusterPartitionConstraint; + } + + private synchronized void resetClusterPartitionConstraint() { + ArrayList<String> clusterActiveLocations = new ArrayList<>(); + for (ClusterPartition p : clusterPartitions.values()) { + if (p.isActive()) { + clusterActiveLocations.add(p.getActiveNodeId()); + } + } + clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint( + clusterActiveLocations.toArray(new String[] {})); + } + + public boolean isGlobalRecoveryCompleted() { + return globalRecoveryCompleted; + } + + public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) { + this.globalRecoveryCompleted = globalRecoveryCompleted; + } + + public boolean isClusterActive() { + if (cluster == null) { + // this is a virtual cluster + return true; + } + return state == ClusterState.ACTIVE; + } + + public static int getNumberOfNodes() { + return AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size(); + } + + public synchronized ClusterPartition[] getNodePartitions(String nodeId) { + return node2PartitionsMap.get(nodeId); + } + + public synchronized int getNodePartitionsCount(String node) { + if (node2PartitionsMap.containsKey(node)) { + return node2PartitionsMap.get(node).length; + } + return 0; + } + + public synchronized ClusterPartition[] getClusterPartitons() { + ArrayList<ClusterPartition> partitons = new ArrayList<>(); + for (ClusterPartition partition : clusterPartitions.values()) { + partitons.add(partition); + } + return partitons.toArray(new ClusterPartition[] {}); + } + + private synchronized void requestPartitionsTakeover(String failedNodeId) { + //replica -> list of partitions to takeover + Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>(); + ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); + + //collect the partitions of the failed NC + List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId); + if (!lostPartitions.isEmpty()) { + for (ClusterPartition partition : lostPartitions) { + //find replicas for this partitions + Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId()); + //find a replica that is still active + for (String replica : partitionReplicas) { + //TODO (mhubail) currently this assigns the partition to the first found active replica. + //It needs to be modified to consider load balancing. + if (addActiveReplica(replica, partition, partitionRecoveryPlan)) { + break; + } + } + } + + if (partitionRecoveryPlan.size() == 0) { + //no active replicas were found for the failed node + LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions); + return; + } else { + LOGGER.info("Partitions to recover: " + lostPartitions); + } + ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() + .getMessageBroker(); + //For each replica, send a request to takeover the assigned partitions + for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) { + String replica = entry.getKey(); + Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]); + long requestId = clusterRequestId++; + TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, + replica, partitionsToTakeover); + pendingTakeoverRequests.put(requestId, takeoverRequest); + try { + messageBroker.sendApplicationMessageToNC(takeoverRequest, replica); + } catch (Exception e) { + /** + * if we fail to send the request, it means the NC we tried to send the request to + * has failed. When the failure notification arrives, we will send any pending request + * that belongs to the failed NC to a different active replica. + */ + LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e); + } + } + } + } + + private boolean addActiveReplica(String replica, ClusterPartition partition, + Map<String, List<Integer>> partitionRecoveryPlan) { + if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) { + if (!partitionRecoveryPlan.containsKey(replica)) { + List<Integer> replicaPartitions = new ArrayList<>(); + replicaPartitions.add(partition.getPartitionId()); + partitionRecoveryPlan.put(replica, replicaPartitions); + } else { + partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); + } + return true; + } + return false; + } + + private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) { + List<ClusterPartition> nodePartitions = new ArrayList<>(); + for (ClusterPartition partition : clusterPartitions.values()) { + if (partition.getActiveNodeId().equals(nodeId)) { + nodePartitions.add(partition); + } + } + /** + * if there is any pending takeover request that this node was supposed to handle, + * it needs to be sent to a different replica + */ + List<Long> failedTakeoverRequests = new ArrayList<>(); + for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) { + if (request.getNodeId().equals(nodeId)) { + for (Integer partitionId : request.getPartitions()) { + nodePartitions.add(clusterPartitions.get(partitionId)); + } + failedTakeoverRequests.add(request.getRequestId()); + } + } + + //remove failed requests + for (Long requestId : failedTakeoverRequests) { + pendingTakeoverRequests.remove(requestId); + } + return nodePartitions; + } + + private synchronized void requestMetadataNodeTakeover() { + //need a new node to takeover metadata node + ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties() + .getMetadataPartition(); + //request the metadataPartition node to register itself as the metadata node + TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage(); + ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() + .getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId()); + } catch (Exception e) { + /** + * if we fail to send the request, it means the NC we tried to send the request to + * has failed. When the failure notification arrives, a new NC will be assigned to + * the metadata partition and a new metadata node takeover request will be sent to it. + */ + LOGGER.log(Level.WARNING, + "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e); + } + } + + public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response) + throws HyracksDataException { + for (Integer partitonId : response.getPartitions()) { + ClusterPartition partition = clusterPartitions.get(partitonId); + partition.setActive(true); + partition.setActiveNodeId(response.getNodeId()); + } + pendingTakeoverRequests.remove(response.getRequestId()); + resetClusterPartitionConstraint(); + updateClusterState(); + } + + public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response) + throws HyracksDataException { + currentMetadataNode = response.getNodeId(); + metadataNodeActive = true; + LOGGER.info("Current metadata node: " + currentMetadataNode); + updateClusterState(); + } + + private synchronized void prepareFailbackPlan(String failingBackNodeId) { + NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId); + pendingProcessingFailbackPlans.add(plan); + planId2FailbackPlanMap.put(plan.getPlanId(), plan); + + //get all partitions this node requires to resync + ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); + Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId); + for (String replicaId : nodeReplicas) { + ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId); + for (ClusterPartition partition : nodePartitions) { + plan.addParticipant(partition.getActiveNodeId()); + /** + * if the partition original node is the returning node, + * add it to the list of the partitions which will be failed back + */ + if (partition.getNodeId().equals(failingBackNodeId)) { + plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId()); + } + } + } + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Prepared Failback plan: " + plan.toString()); + } + + processPendingFailbackPlans(); + } + + private synchronized void processPendingFailbackPlans() { + /** + * if the cluster state is not ACTIVE, then failbacks should not be processed + * since some partitions are not active + */ + if (state == ClusterState.ACTIVE) { + while (!pendingProcessingFailbackPlans.isEmpty()) { + //take the first pending failback plan + NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop(); + /** + * A plan at this stage will be in one of two states: + * 1. PREPARING -> the participants were selected but we haven't sent any request. + * 2. PENDING_ROLLBACK -> a participant failed before we send any requests + */ + if (plan.getState() == FailbackPlanState.PREPARING) { + //set the partitions that will be failed back as inactive + String failbackNode = plan.getNodeId(); + for (Integer partitionId : plan.getPartitionsToFailback()) { + ClusterPartition clusterPartition = clusterPartitions.get(partitionId); + clusterPartition.setActive(false); + //partition expected to be returned to the failing back node + clusterPartition.setActiveNodeId(failbackNode); + } + + /** + * if the returning node is the original metadata node, + * then metadata node will change after the failback completes + */ + String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties() + .getMetadataNodeName(); + if (originalMetadataNode.equals(failbackNode)) { + plan.setNodeToReleaseMetadataManager(currentMetadataNode); + currentMetadataNode = ""; + metadataNodeActive = false; + } + + //force new jobs to wait + state = ClusterState.REBALANCING; + ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE + .getCCApplicationContext().getMessageBroker(); + handleFailbackRequests(plan, messageBroker); + /** + * wait until the current plan is completed before processing the next plan. + * when the current one completes or is reverted, the cluster state will be + * ACTIVE again, and the next failback plan (if any) will be processed. + */ + break; + } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { + //this plan failed before sending any requests -> nothing to rollback + planId2FailbackPlanMap.remove(plan.getPlanId()); + } + } + } + } + + private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) { + //send requests to other nodes to complete on-going jobs and prepare partitions for failback + for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) { + try { + messageBroker.sendApplicationMessageToNC(request, request.getNodeID()); + plan.addPendingRequest(request); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e); + plan.notifyNodeFailure(request.getNodeID()); + revertFailedFailbackPlanEffects(); + break; + } + } + } + + public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) { + NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId()); + plan.markRequestCompleted(msg.getRequestId()); + /** + * A plan at this stage will be in one of three states: + * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait). + * 2. PENDING_COMPLETION -> all responses received (time to send completion request). + * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert). + */ + if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) { + CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage(); + + //send complete resync and takeover partitions to the failing back node + ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() + .getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(request, request.getNodeId()); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e); + notifyFailbackPlansNodeFailure(request.getNodeId()); + revertFailedFailbackPlanEffects(); + } + } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { + revertFailedFailbackPlanEffects(); + } + } + + public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response) + throws HyracksDataException { + /** + * the failback plan completed successfully: + * Remove all references to it. + * Remove the the failing back node from the failed nodes list. + * Notify its replicas to reconnect to it. + * Set the failing back node partitions as active. + */ + NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId()); + String nodeId = plan.getNodeId(); + failedNodes.remove(nodeId); + //notify impacted replicas they can reconnect to this node + notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); + updateNodePartitions(nodeId, true); + } + + private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) { + ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); + Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId); + String nodeIdAddress = ""; + //in case the node joined with a new IP address, we need to send it to the other replicas + if (event == ClusterEventType.NODE_JOIN) { + nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY); + } + + ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); + ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() + .getMessageBroker(); + for (String replica : remoteReplicas) { + //if the remote replica is alive, send the event + if (activeNcConfiguration.containsKey(replica)) { + try { + messageBroker.sendApplicationMessageToNC(msg, replica); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e); + } + } + } + } + + private synchronized void revertFailedFailbackPlanEffects() { + Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); + while (iterator.hasNext()) { + NodeFailbackPlan plan = iterator.next(); + if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { + //TODO if the failing back node is still active, notify it to construct a new plan for it + iterator.remove(); + + //reassign the partitions that were supposed to be failed back to an active replica + requestPartitionsTakeover(plan.getNodeId()); + } + } + } + + private synchronized void notifyFailbackPlansNodeFailure(String nodeId) { + Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); + while (iterator.hasNext()) { + NodeFailbackPlan plan = iterator.next(); + plan.notifyNodeFailure(nodeId); + } + } + + public synchronized boolean isMetadataNodeActive() { + return metadataNodeActive; + } + + public synchronized ObjectNode getClusterStateDescription() { + ObjectMapper om = new ObjectMapper(); + ObjectNode stateDescription = om.createObjectNode(); + stateDescription.put("state", state.name()); + stateDescription.put("metadata_node", currentMetadataNode); + ArrayNode ncs = om.createArrayNode(); + stateDescription.set("ncs",ncs); + for (Map.Entry<String, ClusterPartition[]> entry : node2PartitionsMap.entrySet()) { + ObjectNode nodeJSON = om.createObjectNode(); + nodeJSON.put("node_id", entry.getKey()); + boolean allActive = true; + boolean anyActive = false; + Set<Map<String, Object>> partitions = new HashSet<>(); + for (ClusterPartition part : entry.getValue()) { + HashMap<String, Object> partition = new HashMap<>(); + partition.put("partition_id", "partition_" + part.getPartitionId()); + partition.put("active", part.isActive()); + partitions.add(partition); + allActive = allActive && part.isActive(); + if (allActive) { + anyActive = true; + } + } + nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? "FAILED" + : allActive ? "ACTIVE" + : anyActive ? "PARTIALLY_ACTIVE" + : "INACTIVE"); + nodeJSON.putPOJO("partitions", partitions); + ncs.add(nodeJSON); + } + return stateDescription; + } + + public synchronized ObjectNode getClusterStateSummary() { + ObjectMapper om = new ObjectMapper(); + ObjectNode stateDescription = om.createObjectNode(); + stateDescription.put("state", state.name()); + stateDescription.putPOJO("metadata_node", currentMetadataNode); + stateDescription.putPOJO("partitions", clusterPartitions); + return stateDescription; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java new file mode 100644 index 0000000..117fa9e --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.utils; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; +import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.file.IFileMapProvider; +import org.apache.hyracks.storage.common.file.ILocalResourceRepository; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; + +public class RuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManager, + ILSMIOOperationSchedulerProvider { + + private static final long serialVersionUID = 1L; + + public static final RuntimeComponentsProvider RUNTIME_PROVIDER = new RuntimeComponentsProvider(); + + private RuntimeComponentsProvider() { + } + + @Override + public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) { + return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) + .getLSMIOScheduler(); + } + + @Override + public IBufferCache getBufferCache(IHyracksTaskContext ctx) { + return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) + .getBufferCache(); + } + + @Override + public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) { + return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) + .getFileMapManager(); + } + + @Override + public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) { + return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) + .getLocalResourceRepository(); + } + + @Override + public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) { + return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) + .getDatasetLifecycleManager(); + } + + @Override + public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) { + return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) + .getResourceIdFactory(); + } + +}
