http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
deleted file mode 100644
index c79524a..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.event.schema.cluster.Cluster;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * A holder class for properties related to the Asterix cluster.
- */
-
-public class ClusterStateManager {
-    /*
-     * TODO: currently after instance restarts we require all nodes to join 
again,
-     * otherwise the cluster wont be ACTIVE. we may overcome this by storing 
the cluster state before the instance
-     * shutdown and using it on startup to identify the nodes that are 
expected the join.
-     */
-
-    private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
-    public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
-    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
-    private static final String IO_DEVICES = "iodevices";
-    private Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
-
-    private final Cluster cluster;
-    private ClusterState state = ClusterState.UNUSABLE;
-
-    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
-
-    private boolean globalRecoveryCompleted = false;
-
-    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
-    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
-    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = null;
-
-    private long clusterRequestId = 0;
-    private String currentMetadataNode = null;
-    private boolean metadataNodeActive = false;
-    private boolean autoFailover = false;
-    private boolean replicationEnabled = false;
-    private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
-
-    private ClusterStateManager() {
-        cluster = ClusterProperties.INSTANCE.getCluster();
-        // if this is the CC process
-        if (AppContextInfo.INSTANCE.initialized()
-                && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
-            node2PartitionsMap = 
AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
-            clusterPartitions = 
AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
-            currentMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
-            replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
-            autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
-            if (autoFailover) {
-                pendingTakeoverRequests = new HashMap<>();
-                pendingProcessingFailbackPlans = new LinkedList<>();
-                planId2FailbackPlanMap = new HashMap<>();
-            }
-        }
-    }
-
-    public synchronized void removeNCConfiguration(String nodeId) throws 
HyracksException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
-        }
-        activeNcConfiguration.remove(nodeId);
-
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                notifyFailbackPlansNodeFailure(nodeId);
-                revertFailedFailbackPlanEffects();
-            }
-        } else {
-            //an active node failed
-            failedNodes.add(nodeId);
-            if (nodeId.equals(currentMetadataNode)) {
-                metadataNodeActive = false;
-                LOGGER.info("Metadata node is now inactive");
-            }
-            updateNodePartitions(nodeId, false);
-            if (replicationEnabled) {
-                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
-                if (autoFailover) {
-                    notifyFailbackPlansNodeFailure(nodeId);
-                    requestPartitionsTakeover(nodeId);
-                }
-            }
-        }
-    }
-
-    public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration)
-            throws HyracksException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
-        }
-        activeNcConfiguration.put(nodeId, configuration);
-
-        //a node trying to come back after failure
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                prepareFailbackPlan(nodeId);
-                return;
-            } else {
-                //a node completed local or remote recovery and rejoined
-                failedNodes.remove(nodeId);
-                if (replicationEnabled) {
-                    //notify other replica to reconnect to this node
-                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-                }
-            }
-        }
-
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = true;
-            LOGGER.info("Metadata node is now active");
-        }
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void updateNodePartitions(String nodeId, boolean 
added) throws HyracksDataException {
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
-        if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                // set the active node for this node's partitions
-                p.setActive(added);
-                if (added) {
-                    p.setActiveNodeId(nodeId);
-                }
-            }
-            resetClusterPartitionConstraint();
-            updateClusterState();
-        }
-    }
-
-    private synchronized void updateClusterState() throws HyracksDataException 
{
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (!p.isActive()) {
-                state = ClusterState.UNUSABLE;
-                LOGGER.info("Cluster is in UNUSABLE state");
-                return;
-            }
-        }
-        // if all storage partitions are active as well as the metadata node, 
then the cluster is active
-        if (metadataNodeActive) {
-            state = ClusterState.PENDING;
-            LOGGER.info("Cluster is now " + state);
-            AppContextInfo.INSTANCE.getMetadataBootstrap().init();
-            state = ClusterState.ACTIVE;
-            LOGGER.info("Cluster is now " + state);
-            // start global recovery
-            
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
-                processPendingFailbackPlans();
-            }
-        } else {
-            requestMetadataNodeTakeover();
-        }
-    }
-
-    /**
-     * Returns the IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return a list of IO devices.
-     */
-    public synchronized String[] getIODevices(String nodeId) {
-        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
-        if (ncConfig == null) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Configuration parameters for nodeId " + nodeId
-                        + " not found. The node has not joined yet or has 
left.");
-            }
-            return new String[0];
-        }
-        return ncConfig.get(IO_DEVICES).split(",");
-    }
-
-    public ClusterState getState() {
-        return state;
-    }
-
-    public synchronized Node getAvailableSubstitutionNode() {
-        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : 
cluster.getSubstituteNodes().getNode();
-        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
-    }
-
-    public synchronized Set<String> getParticipantNodes() {
-        Set<String> participantNodes = new HashSet<>();
-        for (String pNode : activeNcConfiguration.keySet()) {
-            participantNodes.add(pNode);
-        }
-        return participantNodes;
-    }
-
-    public synchronized AlgebricksAbsolutePartitionConstraint 
getClusterLocations() {
-        if (clusterPartitionConstraint == null) {
-            resetClusterPartitionConstraint();
-        }
-        return clusterPartitionConstraint;
-    }
-
-    private synchronized void resetClusterPartitionConstraint() {
-        ArrayList<String> clusterActiveLocations = new ArrayList<>();
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (p.isActive()) {
-                clusterActiveLocations.add(p.getActiveNodeId());
-            }
-        }
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
-                clusterActiveLocations.toArray(new String[] {}));
-    }
-
-    public boolean isGlobalRecoveryCompleted() {
-        return globalRecoveryCompleted;
-    }
-
-    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
-        this.globalRecoveryCompleted = globalRecoveryCompleted;
-    }
-
-    public boolean isClusterActive() {
-        if (cluster == null) {
-            // this is a virtual cluster
-            return true;
-        }
-        return state == ClusterState.ACTIVE;
-    }
-
-    public static int getNumberOfNodes() {
-        return 
AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
-    }
-
-    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
-        return node2PartitionsMap.get(nodeId);
-    }
-
-    public synchronized int getNodePartitionsCount(String node) {
-        if (node2PartitionsMap.containsKey(node)) {
-            return node2PartitionsMap.get(node).length;
-        }
-        return 0;
-    }
-
-    public synchronized ClusterPartition[] getClusterPartitons() {
-        ArrayList<ClusterPartition> partitons = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            partitons.add(partition);
-        }
-        return partitons.toArray(new ClusterPartition[] {});
-    }
-
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
-        if (!lostPartitions.isEmpty()) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
-                        break;
-                    }
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            //For each replica, send a request to takeover the assigned 
partitions
-            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
-                String replica = entry.getKey();
-                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
-                } catch (Exception e) {
-                    /**
-                     * if we fail to send the request, it means the NC we 
tried to send the request to
-                     * has failed. When the failure notification arrives, we 
will send any pending request
-                     * that belongs to the failed NC to a different active 
replica.
-                     */
-                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
-                }
-            }
-        }
-    }
-
-    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
-            Map<String, List<Integer>> partitionRecoveryPlan) {
-        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
-            if (!partitionRecoveryPlan.containsKey(replica)) {
-                List<Integer> replicaPartitions = new ArrayList<>();
-                replicaPartitions.add(partition.getPartitionId());
-                partitionRecoveryPlan.put(replica, replicaPartitions);
-            } else {
-                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            if (partition.getActiveNodeId().equals(nodeId)) {
-                nodePartitions.add(partition);
-            }
-        }
-        /**
-         * if there is any pending takeover request that this node was 
supposed to handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitions.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getRequestId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = 
AppContextInfo.INSTANCE.getMetadataProperties()
-                .getMetadataPartition();
-        //request the metadataPartition node to register itself as the 
metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
-        } catch (Exception e) {
-            /**
-             * if we fail to send the request, it means the NC we tried to 
send the request to
-             * has failed. When the failure notification arrives, a new NC 
will be assigned to
-             * the metadata partition and a new metadata node takeover request 
will be sent to it.
-             */
-            LOGGER.log(Level.WARNING,
-                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
-        }
-    }
-
-    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
-            throws HyracksDataException {
-        for (Integer partitonId : response.getPartitions()) {
-            ClusterPartition partition = clusterPartitions.get(partitonId);
-            partition.setActive(true);
-            partition.setActiveNodeId(response.getNodeId());
-        }
-        pendingTakeoverRequests.remove(response.getRequestId());
-        resetClusterPartitionConstraint();
-        updateClusterState();
-    }
-
-    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
response)
-            throws HyracksDataException {
-        currentMetadataNode = response.getNodeId();
-        metadataNodeActive = true;
-        LOGGER.info("Current metadata node: " + currentMetadataNode);
-        updateClusterState();
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicationClients(failingBackNodeId);
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = 
node2PartitionsMap.get(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /**
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed 
back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        /**
-         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /**
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
-                        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionId);
-                        clusterPartition.setActive(false);
-                        //partition expected to be returned to the failing 
back node
-                        clusterPartition.setActiveNodeId(failbackNode);
-                    }
-
-                    /**
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback 
completes
-                     */
-                    String originalMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties()
-                            .getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                    }
-
-                    //force new jobs to wait
-                    state = ClusterState.REBALANCING;
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE
-                            .getCCApplicationContext().getMessageBroker();
-                    handleFailbackRequests(plan, messageBroker);
-                    /**
-                     * wait until the current plan is completed before 
processing the next plan.
-                     * when the current one completes or is reverted, the 
cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
-                     */
-                    break;
-                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing 
to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
-        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
-        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
-                plan.addPendingRequest(request);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
-                plan.notifyNodeFailure(request.getNodeID());
-                revertFailedFailbackPlanEffects();
-                break;
-            }
-        }
-    }
-
-    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /**
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back 
node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
-            throws HyracksDataException {
-        /**
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
-        String nodeIdAddress = "";
-        //in case the node joined with a new IP address, we need to send it to 
the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
-        }
-
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        for (String replica : remoteReplicas) {
-            //if the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
-                }
-            }
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back 
to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            plan.notifyNodeFailure(nodeId);
-        }
-    }
-
-    public synchronized boolean isMetadataNodeActive() {
-        return metadataNodeActive;
-    }
-
-    public synchronized ObjectNode getClusterStateDescription()  {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode stateDescription = om.createObjectNode();
-        stateDescription.put("state", state.name());
-        stateDescription.put("metadata_node", currentMetadataNode);
-        ArrayNode ncs = om.createArrayNode();
-        stateDescription.set("ncs",ncs);
-        for (Map.Entry<String, ClusterPartition[]> entry : 
node2PartitionsMap.entrySet()) {
-            ObjectNode nodeJSON = om.createObjectNode();
-            nodeJSON.put("node_id", entry.getKey());
-            boolean allActive = true;
-            boolean anyActive = false;
-            Set<Map<String, Object>> partitions = new HashSet<>();
-            for (ClusterPartition part : entry.getValue()) {
-                HashMap<String, Object> partition = new HashMap<>();
-                partition.put("partition_id", "partition_" + 
part.getPartitionId());
-                partition.put("active", part.isActive());
-                partitions.add(partition);
-                allActive = allActive && part.isActive();
-                if (allActive) {
-                    anyActive = true;
-                }
-            }
-            nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? 
"FAILED"
-                    : allActive ? "ACTIVE"
-                    : anyActive ? "PARTIALLY_ACTIVE"
-                    : "INACTIVE");
-            nodeJSON.putPOJO("partitions", partitions);
-            ncs.add(nodeJSON);
-        }
-        return stateDescription;
-    }
-
-    public synchronized ObjectNode getClusterStateSummary() {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode stateDescription = om.createObjectNode();
-        stateDescription.put("state", state.name());
-        stateDescription.putPOJO("metadata_node", currentMetadataNode);
-        stateDescription.putPOJO("partitions", clusterPartitions);
-        return stateDescription;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java
deleted file mode 100644
index 5b4daa6..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeComponentsProvider.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.util;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-import org.apache.hyracks.storage.common.file.IFileMapProvider;
-import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
-
-public class RuntimeComponentsProvider implements 
IIndexLifecycleManagerProvider, IStorageManagerInterface,
-        ILSMIOOperationSchedulerProvider {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final RuntimeComponentsProvider RUNTIME_PROVIDER = new 
RuntimeComponentsProvider();
-
-    private RuntimeComponentsProvider() {
-    }
-
-    @Override
-    public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getLSMIOScheduler();
-    }
-
-    @Override
-    public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getBufferCache();
-    }
-
-    @Override
-    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getFileMapManager();
-    }
-
-    @Override
-    public ILocalResourceRepository 
getLocalResourceRepository(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getLocalResourceRepository();
-    }
-
-    @Override
-    public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext 
ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getDatasetLifecycleManager();
-    }
-
-    @Override
-    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getResourceIdFactory();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
deleted file mode 100644
index 608def7..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.util;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-
-/**
- * Utility class for obtaining information on the set of Hyracks NodeController
- * processes that are running on a given host.
- */
-public class RuntimeUtils {
-
-    private RuntimeUtils() {
-    }
-
-    public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) 
throws HyracksDataException {
-        Map<InetAddress, Set<String>> nodeControllerInfo = 
getNodeControllerMap();
-        return nodeControllerInfo.get(ipAddress);
-    }
-
-    public static List<String> getAllNodeControllers() throws 
HyracksDataException {
-        Collection<Set<String>> nodeControllersCollection = 
getNodeControllerMap().values();
-        List<String> nodeControllers = new ArrayList<>();
-        for (Set<String> ncCollection : nodeControllersCollection) {
-            nodeControllers.addAll(ncCollection);
-        }
-        return nodeControllers;
-    }
-
-    public static Map<InetAddress, Set<String>> getNodeControllerMap() throws 
HyracksDataException {
-        Map<InetAddress, Set<String>> map = new HashMap<>();
-        
AppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
-        return map;
-    }
-
-    public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) 
{
-        ClusterControllerService ccs = (ClusterControllerService) 
AppContextInfo.INSTANCE
-                .getCCApplicationContext().getControllerService();
-        INodeManager nodeManager = ccs.getNodeManager();
-        map.putAll(nodeManager.getIpAddressNodeNameMap());
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
new file mode 100644
index 0000000..355f503
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.utils;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.config.BuildProperties;
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.common.config.ExtensionProperties;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.asterix.common.config.FeedProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.dataflow.IApplicationContextInfo;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+/*
+ * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager
+ * instances that are accessed from the NCs. In addition an instance of 
ICCApplicationContext
+ * is stored for access by the CC.
+ */
+public class AppContextInfo implements IApplicationContextInfo, 
IPropertiesProvider {
+
+    public static final AppContextInfo INSTANCE = new AppContextInfo();
+    private ICCApplicationContext appCtx;
+    private IGlobalRecoveryManager globalRecoveryManager;
+    private ILibraryManager libraryManager;
+    private IResourceIdManager resourceIdManager;
+    private CompilerProperties compilerProperties;
+    private ExternalProperties externalProperties;
+    private MetadataProperties metadataProperties;
+    private StorageProperties storageProperties;
+    private TransactionProperties txnProperties;
+    private FeedProperties feedProperties;
+    private BuildProperties buildProperties;
+    private ReplicationProperties replicationProperties;
+    private ExtensionProperties extensionProperties;
+    private MessagingProperties messagingProperties;
+    private Supplier<IMetadataBootstrap> metadataBootstrapSupplier;
+    private IHyracksClientConnection hcc;
+    private Object extensionManager;
+    private volatile boolean initialized = false;
+
+    private AppContextInfo() {
+    }
+
+    public static synchronized void initialize(ICCApplicationContext ccAppCtx, 
IHyracksClientConnection hcc,
+            ILibraryManager libraryManager, IResourceIdManager 
resourceIdManager,
+            Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager)
+            throws AsterixException, IOException {
+        if (INSTANCE.initialized) {
+            throw new AsterixException(AppContextInfo.class.getSimpleName() + 
" has been initialized already");
+        }
+        INSTANCE.initialized = true;
+        INSTANCE.appCtx = ccAppCtx;
+        INSTANCE.hcc = hcc;
+        INSTANCE.libraryManager = libraryManager;
+        INSTANCE.resourceIdManager = resourceIdManager;
+        // Determine whether to use old-style asterix-configuration.xml or 
new-style configuration.
+        // QQQ strip this out eventually
+        PropertiesAccessor propertiesAccessor = 
PropertiesAccessor.getInstance(ccAppCtx.getAppConfig());
+        INSTANCE.compilerProperties = new 
CompilerProperties(propertiesAccessor);
+        INSTANCE.externalProperties = new 
ExternalProperties(propertiesAccessor);
+        INSTANCE.metadataProperties = new 
MetadataProperties(propertiesAccessor);
+        INSTANCE.storageProperties = new StorageProperties(propertiesAccessor);
+        INSTANCE.txnProperties = new TransactionProperties(propertiesAccessor);
+        INSTANCE.feedProperties = new FeedProperties(propertiesAccessor);
+        INSTANCE.extensionProperties = new 
ExtensionProperties(propertiesAccessor);
+        INSTANCE.replicationProperties = new 
ReplicationProperties(propertiesAccessor);
+        INSTANCE.hcc = hcc;
+        INSTANCE.buildProperties = new BuildProperties(propertiesAccessor);
+        INSTANCE.messagingProperties = new 
MessagingProperties(propertiesAccessor);
+        INSTANCE.metadataBootstrapSupplier = metadataBootstrapSupplier;
+        INSTANCE.globalRecoveryManager = globalRecoveryManager;
+
+        
Logger.getLogger("org.apache.asterix").setLevel(INSTANCE.externalProperties.getLogLevel());
+        
Logger.getLogger("org.apache.hyracks").setLevel(INSTANCE.externalProperties.getLogLevel());
+    }
+
+    public boolean initialized() {
+        return initialized;
+    }
+
+    @Override
+    public ICCApplicationContext getCCApplicationContext() {
+        return appCtx;
+    }
+
+    @Override
+    public StorageProperties getStorageProperties() {
+        return storageProperties;
+    }
+
+    @Override
+    public TransactionProperties getTransactionProperties() {
+        return txnProperties;
+    }
+
+    @Override
+    public CompilerProperties getCompilerProperties() {
+        return compilerProperties;
+    }
+
+    @Override
+    public MetadataProperties getMetadataProperties() {
+        return metadataProperties;
+    }
+
+    @Override
+    public ExternalProperties getExternalProperties() {
+        return externalProperties;
+    }
+
+    @Override
+    public FeedProperties getFeedProperties() {
+        return feedProperties;
+    }
+
+    @Override
+    public BuildProperties getBuildProperties() {
+        return buildProperties;
+    }
+
+    public IHyracksClientConnection getHcc() {
+        return hcc;
+    }
+
+    @Override
+    public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
+        return RuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+    @Override
+    public IStorageManager getStorageManager() {
+        return RuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+    @Override
+    public ReplicationProperties getReplicationProperties() {
+        return replicationProperties;
+    }
+
+    @Override
+    public IGlobalRecoveryManager getGlobalRecoveryManager() {
+        return globalRecoveryManager;
+    }
+
+    @Override
+    public ILibraryManager getLibraryManager() {
+        return libraryManager;
+    }
+
+    public Object getExtensionManager() {
+        return extensionManager;
+    }
+
+    public void setExtensionManager(Object extensionManager) {
+        this.extensionManager = extensionManager;
+    }
+
+    public ExtensionProperties getExtensionProperties() {
+        return extensionProperties;
+    }
+
+    @Override
+    public MessagingProperties getMessagingProperties() {
+        return messagingProperties;
+    }
+
+    public IResourceIdManager getResourceIdManager() {
+        return resourceIdManager;
+    }
+
+    public IMetadataBootstrap getMetadataBootstrap() {
+        return metadataBootstrapSupplier.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
new file mode 100644
index 0000000..8fffdb1
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * A holder class for properties related to the Asterix cluster.
+ */
+
+public class ClusterStateManager {
+    /*
+     * TODO: currently after instance restarts we require all nodes to join 
again,
+     * otherwise the cluster wont be ACTIVE. we may overcome this by storing 
the cluster state before the instance
+     * shutdown and using it on startup to identify the nodes that are 
expected the join.
+     */
+
+    private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
+    public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
+    private static final String IO_DEVICES = "iodevices";
+    private Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
+
+    private final Cluster cluster;
+    private ClusterState state = ClusterState.UNUSABLE;
+
+    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
+
+    private boolean globalRecoveryCompleted = false;
+
+    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = null;
+
+    private long clusterRequestId = 0;
+    private String currentMetadataNode = null;
+    private boolean metadataNodeActive = false;
+    private boolean autoFailover = false;
+    private boolean replicationEnabled = false;
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+
+    private ClusterStateManager() {
+        cluster = ClusterProperties.INSTANCE.getCluster();
+        // if this is the CC process
+        if (AppContextInfo.INSTANCE.initialized()
+                && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+            node2PartitionsMap = 
AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
+            clusterPartitions = 
AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
+            currentMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+            replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
+            autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
+            if (autoFailover) {
+                pendingTakeoverRequests = new HashMap<>();
+                pendingProcessingFailbackPlans = new LinkedList<>();
+                planId2FailbackPlanMap = new HashMap<>();
+            }
+        }
+    }
+
+    public synchronized void removeNCConfiguration(String nodeId) throws 
HyracksException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
+        }
+        activeNcConfiguration.remove(nodeId);
+
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                notifyFailbackPlansNodeFailure(nodeId);
+                revertFailedFailbackPlanEffects();
+            }
+        } else {
+            //an active node failed
+            failedNodes.add(nodeId);
+            if (nodeId.equals(currentMetadataNode)) {
+                metadataNodeActive = false;
+                LOGGER.info("Metadata node is now inactive");
+            }
+            updateNodePartitions(nodeId, false);
+            if (replicationEnabled) {
+                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+                if (autoFailover) {
+                    notifyFailbackPlansNodeFailure(nodeId);
+                    requestPartitionsTakeover(nodeId);
+                }
+            }
+        }
+    }
+
+    public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration)
+            throws HyracksException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
+        }
+        activeNcConfiguration.put(nodeId, configuration);
+
+        //a node trying to come back after failure
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                prepareFailbackPlan(nodeId);
+                return;
+            } else {
+                //a node completed local or remote recovery and rejoined
+                failedNodes.remove(nodeId);
+                if (replicationEnabled) {
+                    //notify other replica to reconnect to this node
+                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+                }
+            }
+        }
+
+        if (nodeId.equals(currentMetadataNode)) {
+            metadataNodeActive = true;
+            LOGGER.info("Metadata node is now active");
+        }
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void updateNodePartitions(String nodeId, boolean 
added) throws HyracksDataException {
+        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+        // if this isn't a storage node, it will not have cluster partitions
+        if (nodePartitions != null) {
+            for (ClusterPartition p : nodePartitions) {
+                // set the active node for this node's partitions
+                p.setActive(added);
+                if (added) {
+                    p.setActiveNodeId(nodeId);
+                }
+            }
+            resetClusterPartitionConstraint();
+            updateClusterState();
+        }
+    }
+
+    private synchronized void updateClusterState() throws HyracksDataException 
{
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (!p.isActive()) {
+                state = ClusterState.UNUSABLE;
+                LOGGER.info("Cluster is in UNUSABLE state");
+                return;
+            }
+        }
+        // if all storage partitions are active as well as the metadata node, 
then the cluster is active
+        if (metadataNodeActive) {
+            state = ClusterState.PENDING;
+            LOGGER.info("Cluster is now " + state);
+            AppContextInfo.INSTANCE.getMetadataBootstrap().init();
+            state = ClusterState.ACTIVE;
+            LOGGER.info("Cluster is now " + state);
+            // start global recovery
+            
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
+            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
+                processPendingFailbackPlans();
+            }
+        } else {
+            requestMetadataNodeTakeover();
+        }
+    }
+
+    /**
+     * Returns the IO devices configured for a Node Controller
+     *
+     * @param nodeId
+     *            unique identifier of the Node Controller
+     * @return a list of IO devices.
+     */
+    public synchronized String[] getIODevices(String nodeId) {
+        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
+        if (ncConfig == null) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Configuration parameters for nodeId " + nodeId
+                        + " not found. The node has not joined yet or has 
left.");
+            }
+            return new String[0];
+        }
+        return ncConfig.get(IO_DEVICES).split(",");
+    }
+
+    public ClusterState getState() {
+        return state;
+    }
+
+    public synchronized Node getAvailableSubstitutionNode() {
+        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : 
cluster.getSubstituteNodes().getNode();
+        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
+    }
+
+    public synchronized Set<String> getParticipantNodes() {
+        Set<String> participantNodes = new HashSet<>();
+        for (String pNode : activeNcConfiguration.keySet()) {
+            participantNodes.add(pNode);
+        }
+        return participantNodes;
+    }
+
+    public synchronized AlgebricksAbsolutePartitionConstraint 
getClusterLocations() {
+        if (clusterPartitionConstraint == null) {
+            resetClusterPartitionConstraint();
+        }
+        return clusterPartitionConstraint;
+    }
+
+    private synchronized void resetClusterPartitionConstraint() {
+        ArrayList<String> clusterActiveLocations = new ArrayList<>();
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (p.isActive()) {
+                clusterActiveLocations.add(p.getActiveNodeId());
+            }
+        }
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
+    }
+
+    public boolean isGlobalRecoveryCompleted() {
+        return globalRecoveryCompleted;
+    }
+
+    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
+        this.globalRecoveryCompleted = globalRecoveryCompleted;
+    }
+
+    public boolean isClusterActive() {
+        if (cluster == null) {
+            // this is a virtual cluster
+            return true;
+        }
+        return state == ClusterState.ACTIVE;
+    }
+
+    public static int getNumberOfNodes() {
+        return 
AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
+    }
+
+    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+        return node2PartitionsMap.get(nodeId);
+    }
+
+    public synchronized int getNodePartitionsCount(String node) {
+        if (node2PartitionsMap.containsKey(node)) {
+            return node2PartitionsMap.get(node).length;
+        }
+        return 0;
+    }
+
+    public synchronized ClusterPartition[] getClusterPartitons() {
+        ArrayList<ClusterPartition> partitons = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            partitons.add(partition);
+        }
+        return partitons.toArray(new ClusterPartition[] {});
+    }
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
+        if (!lostPartitions.isEmpty()) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
+                        break;
+                    }
+                }
+            }
+
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
+                    .getMessageBroker();
+            //For each replica, send a request to takeover the assigned 
partitions
+            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we 
tried to send the request to
+                     * has failed. When the failure notification arrives, we 
will send any pending request
+                     * that belongs to the failed NC to a different active 
replica.
+                     */
+                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
+                }
+            }
+        }
+    }
+
+    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request that this node was 
supposed to handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitions.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getRequestId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+        return nodePartitions;
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = 
AppContextInfo.INSTANCE.getMetadataProperties()
+                .getMetadataPartition();
+        //request the metadataPartition node to register itself as the 
metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
+        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
+                .getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to 
send the request to
+             * has failed. When the failure notification arrives, a new NC 
will be assigned to
+             * the metadata partition and a new metadata node takeover request 
will be sent to it.
+             */
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
+        }
+    }
+
+    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
+            throws HyracksDataException {
+        for (Integer partitonId : response.getPartitions()) {
+            ClusterPartition partition = clusterPartitions.get(partitonId);
+            partition.setActive(true);
+            partition.setActiveNodeId(response.getNodeId());
+        }
+        pendingTakeoverRequests.remove(response.getRequestId());
+        resetClusterPartitionConstraint();
+        updateClusterState();
+    }
+
+    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
response)
+            throws HyracksDataException {
+        currentMetadataNode = response.getNodeId();
+        metadataNodeActive = true;
+        LOGGER.info("Current metadata node: " + currentMetadataNode);
+        updateClusterState();
+    }
+
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicationClients(failingBackNodeId);
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = 
node2PartitionsMap.get(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed 
back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
+    }
+
+    private synchronized void processPendingFailbackPlans() {
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
+                        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionId);
+                        clusterPartition.setActive(false);
+                        //partition expected to be returned to the failing 
back node
+                        clusterPartition.setActiveNodeId(failbackNode);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback 
completes
+                     */
+                    String originalMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties()
+                            .getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                    }
+
+                    //force new jobs to wait
+                    state = ClusterState.REBALANCING;
+                    ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE
+                            .getCCApplicationContext().getMessageBroker();
+                    handleFailbackRequests(plan, messageBroker);
+                    /**
+                     * wait until the current plan is completed before 
processing the next plan.
+                     * when the current one completes or is reverted, the 
cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
+                     */
+                    break;
+                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing 
to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
+    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back 
node
+            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
+                    .getMessageBroker();
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
+            throws HyracksDataException {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
+        String nodeIdAddress = "";
+        //in case the node joined with a new IP address, we need to send it to 
the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
+        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
+                .getMessageBroker();
+        for (String replica : remoteReplicas) {
+            //if the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
+                }
+            }
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back 
to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    public synchronized boolean isMetadataNodeActive() {
+        return metadataNodeActive;
+    }
+
+    public synchronized ObjectNode getClusterStateDescription()  {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode stateDescription = om.createObjectNode();
+        stateDescription.put("state", state.name());
+        stateDescription.put("metadata_node", currentMetadataNode);
+        ArrayNode ncs = om.createArrayNode();
+        stateDescription.set("ncs",ncs);
+        for (Map.Entry<String, ClusterPartition[]> entry : 
node2PartitionsMap.entrySet()) {
+            ObjectNode nodeJSON = om.createObjectNode();
+            nodeJSON.put("node_id", entry.getKey());
+            boolean allActive = true;
+            boolean anyActive = false;
+            Set<Map<String, Object>> partitions = new HashSet<>();
+            for (ClusterPartition part : entry.getValue()) {
+                HashMap<String, Object> partition = new HashMap<>();
+                partition.put("partition_id", "partition_" + 
part.getPartitionId());
+                partition.put("active", part.isActive());
+                partitions.add(partition);
+                allActive = allActive && part.isActive();
+                if (allActive) {
+                    anyActive = true;
+                }
+            }
+            nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? 
"FAILED"
+                    : allActive ? "ACTIVE"
+                    : anyActive ? "PARTIALLY_ACTIVE"
+                    : "INACTIVE");
+            nodeJSON.putPOJO("partitions", partitions);
+            ncs.add(nodeJSON);
+        }
+        return stateDescription;
+    }
+
+    public synchronized ObjectNode getClusterStateSummary() {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode stateDescription = om.createObjectNode();
+        stateDescription.put("state", state.name());
+        stateDescription.putPOJO("metadata_node", currentMetadataNode);
+        stateDescription.putPOJO("partitions", clusterPartitions);
+        return stateDescription;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
new file mode 100644
index 0000000..117fa9e
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.utils;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.file.IFileMapProvider;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+public class RuntimeComponentsProvider implements 
IIndexLifecycleManagerProvider, IStorageManager,
+        ILSMIOOperationSchedulerProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final RuntimeComponentsProvider RUNTIME_PROVIDER = new 
RuntimeComponentsProvider();
+
+    private RuntimeComponentsProvider() {
+    }
+
+    @Override
+    public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
+        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getLSMIOScheduler();
+    }
+
+    @Override
+    public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getBufferCache();
+    }
+
+    @Override
+    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getFileMapManager();
+    }
+
+    @Override
+    public ILocalResourceRepository 
getLocalResourceRepository(IHyracksTaskContext ctx) {
+        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getLocalResourceRepository();
+    }
+
+    @Override
+    public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext 
ctx) {
+        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getDatasetLifecycleManager();
+    }
+
+    @Override
+    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getResourceIdFactory();
+    }
+
+}

Reply via email to