http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 0dcdc7b..fcc997f 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -27,13 +27,14 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -41,6 +42,7 @@ import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
@@ -61,8 +63,7 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
         //1. identify which replicas reside in this node
         String localNodeId = runtimeContext.getTransactionSubsystem().getId();
 
-        Set<String> nodes = 
replicationProperties.getNodeReplicationClients(localNodeId);
-
+        Set<String> nodes = 
replicationProperties.getNodeReplicasIds(localNodeId);
         Map<String, Set<String>> recoveryCandidates = new HashMap<>();
         Map<String, Integer> candidatesScore = new HashMap<>();
 
@@ -124,16 +125,9 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
     }
 
     @Override
-    public void takeoverPartitons(Integer[] partitions) throws IOException, 
ACIDException {
-        /**
-         * TODO even though the takeover is always expected to succeed,
-         * in case of any failure during the takeover, the CC should be
-         * notified that the takeover failed.
-         */
-        Set<Integer> partitionsToTakeover = new 
HashSet<>(Arrays.asList(partitions));
+    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean 
flush) throws HyracksDataException {
         ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
-
-        long minLSN = 
runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitionsToTakeover);
+        long minLSN = 
runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
         long readableSmallestLSN = logManager.getReadableSmallestLSN();
         if (minLSN < readableSmallestLSN) {
             minLSN = readableSmallestLSN;
@@ -141,7 +135,25 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
 
         //replay logs > minLSN that belong to these partitions
         IRecoveryManager recoveryManager = 
runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        recoveryManager.replayPartitionsLogs(partitionsToTakeover, 
logManager.getLogReader(true), minLSN);
+        try {
+            recoveryManager.replayPartitionsLogs(partitions, 
logManager.getLogReader(true), minLSN);
+            if (flush) {
+                runtimeContext.getDatasetLifecycleManager().flushAllDatasets();
+            }
+        } catch (IOException | ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void takeoverPartitons(Integer[] partitions) throws IOException, 
ACIDException {
+        /**
+         * TODO even though the takeover is always expected to succeed,
+         * in case of any failure during the takeover, the CC should be
+         * notified that the takeover failed.
+         */
+        Set<Integer> partitionsToTakeover = new 
HashSet<>(Arrays.asList(partitions));
+        replayReplicaPartitionLogs(partitionsToTakeover, false);
 
         //mark these partitions as active in this node
         PersistentLocalResourceRepository resourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
@@ -157,8 +169,9 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
         PersistentLocalResourceRepository resourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
                 .getLocalResourceRepository();
         IDatasetLifecycleManager datasetLifeCycleManager = 
runtimeContext.getDatasetLifecycleManager();
+        Map<String, ClusterPartition[]> nodePartitions = 
((IPropertiesProvider) runtimeContext).getMetadataProperties()
+                .getNodePartitions();
 
-        failbackRecoveryReplicas = new HashMap<>();
         while (true) {
             //start recovery steps
             try {
@@ -189,10 +202,15 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
                 /*** Start Recovery Per Lost Replica ***/
                 for (Entry<String, Set<String>> remoteReplica : 
failbackRecoveryReplicas.entrySet()) {
                     String replicaId = remoteReplica.getKey();
-                    Set<String> partitionsToRecover = remoteReplica.getValue();
+                    Set<String> ncsToRecoverFor = remoteReplica.getValue();
+                    Set<Integer> partitionsIds = new HashSet<>();
+                    for (String node : ncsToRecoverFor) {
+                        
partitionsIds.addAll((Arrays.asList(nodePartitions.get(node))).stream()
+                                
.map(ClusterPartition::getPartitionId).collect(Collectors.toList()));
+                    }
 
                     //1. Request indexes metadata and LSM components
-                    replicationManager.requestReplicaFiles(replicaId, 
partitionsToRecover, new HashSet<String>());
+                    replicationManager.requestReplicaFiles(replicaId, 
partitionsIds, new HashSet<String>());
                 }
                 break;
             } catch (IOException e) {
@@ -209,8 +227,8 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
         ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
         ReplicaResourcesManager replicaResourcesManager = 
(ReplicaResourcesManager) runtimeContext
                 .getReplicaResourcesManager();
-        Map<String, ClusterPartition[]> nodePartitions = 
((IPropertiesProvider) runtimeContext)
-                .getMetadataProperties().getNodePartitions();
+        Map<String, ClusterPartition[]> nodePartitions = 
((IPropertiesProvider) runtimeContext).getMetadataProperties()
+                .getNodePartitions();
 
         /**
          * for each lost partition, get the remaining files from replicas
@@ -221,17 +239,19 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
                 String replicaId = remoteReplica.getKey();
                 Set<String> NCsDataToRecover = remoteReplica.getValue();
                 Set<String> existingFiles = new HashSet<>();
+                Set<Integer> partitionsToRecover = new HashSet<>();
                 for (String nodeId : NCsDataToRecover) {
                     //get partitions that will be recovered from this node
                     ClusterPartition[] replicaPartitions = 
nodePartitions.get(nodeId);
                     for (ClusterPartition partition : replicaPartitions) {
                         existingFiles.addAll(
                                 
replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), 
true));
+                        partitionsToRecover.add(partition.getPartitionId());
                     }
                 }
 
                 //Request remaining indexes files
-                replicationManager.requestReplicaFiles(replicaId, 
NCsDataToRecover, existingFiles);
+                replicationManager.requestReplicaFiles(replicaId, 
partitionsToRecover, existingFiles);
             }
         } catch (IOException e) {
             /**
@@ -256,4 +276,52 @@ public class RemoteRecoveryManager implements 
IRemoteRecoveryManager {
 
         failbackRecoveryReplicas = null;
     }
-}
+
+    //TODO refactor common code between remote recovery and failback process
+    @Override
+    public void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) 
throws HyracksDataException {
+        int maxRecoveryAttempts = 
replicationProperties.getMaxRemoteRecoveryAttempts();
+        PersistentLocalResourceRepository resourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
+                .getLocalResourceRepository();
+        IDatasetLifecycleManager datasetLifeCycleManager = 
runtimeContext.getDatasetLifecycleManager();
+        ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
+        while (true) {
+            //start recovery steps
+            try {
+                if (maxRecoveryAttempts <= 0) {
+                    //to avoid infinite loop in case of unexpected behavior.
+                    throw new IllegalStateException("Failed to perform remote 
recovery.");
+                }
+
+                /*** Prepare for Recovery ***/
+                //1. clean any memory data that could've existed from previous 
failed recovery attempt
+                datasetLifeCycleManager.closeAllDatasets();
+
+                //2. remove any existing storage data and initialize storage 
metadata
+                resourceRepository.deleteStorageData(true);
+                
resourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
+
+                /*** Start Recovery Per Lost Replica ***/
+                for (Entry<String, Set<Integer>> remoteReplica : 
recoveryPlan.entrySet()) {
+                    String replicaId = remoteReplica.getKey();
+                    Set<Integer> partitionsToRecover = 
remoteReplica.getValue();
+
+                    //Request indexes metadata and LSM components
+                    replicationManager.requestReplicaFiles(replicaId, 
partitionsToRecover, new HashSet<String>());
+                }
+
+                //get max LSN from selected remote replicas
+                long maxRemoteLSN = 
replicationManager.getMaxRemoteLSN(recoveryPlan.keySet());
+
+                //6. force LogManager to start from a partition > maxLSN in 
selected remote replicas
+                logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
+                break;
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Failed during remote recovery. 
Attempting again...", e);
+                }
+                maxRecoveryAttempts--;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index cce3dc4..6a2ebf6 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,8 +38,8 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
@@ -285,7 +285,7 @@ public class ReplicaResourcesManager implements 
IReplicaResourcesManager {
                             partitionFiles.add(file.getAbsolutePath());
                         } else {
                             partitionFiles.add(
-                                    
PersistentLocalResourceRepository.getResourceRelativePath(file.getAbsolutePath()));
+                                    
StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 7c386d5..b2855f6 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
 
-public abstract class AbstractFailbackPlanMessage implements 
IApplicationMessage {
+public abstract class AbstractFailbackPlanMessage implements 
INCLifecycleMessage {
 
     private static final long serialVersionUID = 1L;
     protected final long planId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
deleted file mode 100644
index 2454800..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class CompleteFailbackRequestMessage extends 
AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = 
Logger.getLogger(CompleteFailbackRequestMessage.class.getName());
-    private final Set<Integer> partitions;
-    private final String nodeId;
-
-    public CompleteFailbackRequestMessage(long planId, int requestId, String 
nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeId = nodeId;
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
-        sb.append(" Plan ID: " + planId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        NodeControllerService ncs = (NodeControllerService) cs;
-        IAppRuntimeContext appContext =
-                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
-        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
-        HyracksDataException hde = null;
-        try {
-            IRemoteRecoveryManager remoteRecoeryManager = 
appContext.getRemoteRecoveryManager();
-            remoteRecoeryManager.completeFailbackProcess();
-        } catch (IOException | InterruptedException e) {
-            LOGGER.log(Level.SEVERE, "Failure during completion of failback 
process", e);
-            hde = ExceptionUtils.convertToHyracksDataException(e);
-        } finally {
-            CompleteFailbackResponseMessage reponse = new 
CompleteFailbackResponseMessage(planId,
-                    requestId, partitions);
-            try {
-                broker.sendMessageToCC(reponse);
-            } catch (Exception e) {
-                LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
-                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
-            }
-        }
-        if (hde != null) {
-            throw hde;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
deleted file mode 100644
index 07c366c..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.util.Set;
-
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class CompleteFailbackResponseMessage extends 
AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public CompleteFailbackResponseMessage(long planId, int requestId, 
Set<Integer> partitions) {
-        super(planId, requestId);
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
-        sb.append(" Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
deleted file mode 100644
index 24f5fe8..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class NodeFailbackPlan {
-
-    public enum FailbackPlanState {
-        /**
-         * Initial state while selecting the nodes that will participate
-         * in the node failback plan.
-         */
-        PREPARING,
-        /**
-         * Once a pending {@link PreparePartitionsFailbackRequestMessage} 
request is added,
-         * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE 
to indicate
-         * a response is expected and need to wait for it.
-         */
-        PENDING_PARTICIPANT_REPONSE,
-        /**
-         * Upon receiving the last {@link 
PreparePartitionsFailbackResponseMessage} response,
-         * the state changes from PENDING_PARTICIPANT_REPONSE to 
PENDING_COMPLETION to indicate
-         * the need to send {@link CompleteFailbackRequestMessage} to the 
failing back node.
-         */
-        PENDING_COMPLETION,
-        /**
-         * if any of the participants fail or the failing back node itself 
fails during
-         * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, 
PENDING_COMPLETION),
-         * the state is changed to FAILED.
-         */
-        FAILED,
-        /**
-         * if the state is FAILED, and all pending responses (if any) have 
been received,
-         * the state changes from FAILED to PENDING_ROLLBACK to indicate the 
need to revert
-         * the effects of this plan (if any).
-         */
-        PENDING_ROLLBACK
-    }
-
-    private static long planIdGenerator = 0;
-    private long planId;
-    private final String nodeId;
-    private final Set<String> participants;
-    private final Map<Integer, String> partition2nodeMap;
-    private String nodeToReleaseMetadataManager;
-    private int requestId;
-    private Map<Integer, PreparePartitionsFailbackRequestMessage> 
pendingRequests;
-    private FailbackPlanState state;
-
-    public static NodeFailbackPlan createPlan(String nodeId) {
-        return new NodeFailbackPlan(planIdGenerator++, nodeId);
-    }
-
-    private NodeFailbackPlan(long planId, String nodeId) {
-        this.planId = planId;
-        this.nodeId = nodeId;
-        participants = new HashSet<>();
-        partition2nodeMap = new HashMap<>();
-        pendingRequests = new HashMap<>();
-        state = FailbackPlanState.PREPARING;
-    }
-
-    public synchronized void addPartitionToFailback(int partitionId, String 
currentActiveNode) {
-        partition2nodeMap.put(partitionId, currentActiveNode);
-    }
-
-    public synchronized void addParticipant(String nodeId) {
-        participants.add(nodeId);
-    }
-
-    public synchronized void notifyNodeFailure(String failedNode) {
-        if (participants.contains(failedNode)) {
-            if (state == FailbackPlanState.PREPARING) {
-                state = FailbackPlanState.FAILED;
-            } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) 
{
-                /**
-                 * if there is any pending request from this failed node,
-                 * it should be marked as completed and the plan should be 
marked as failed
-                 */
-                Set<Integer> failedRequests = new HashSet<>();
-                for (PreparePartitionsFailbackRequestMessage request : 
pendingRequests.values()) {
-                    if (request.getNodeID().equals(failedNode)) {
-                        failedRequests.add(request.getRequestId());
-                    }
-                }
-
-                if (!failedRequests.isEmpty()) {
-                    state = FailbackPlanState.FAILED;
-                    for (Integer failedRequestId : failedRequests) {
-                        markRequestCompleted(failedRequestId);
-                    }
-                }
-            }
-        } else if (nodeId.equals(failedNode)) {
-            //if the failing back node is the failed node itself
-            state = FailbackPlanState.FAILED;
-            updateState();
-        }
-    }
-
-    public synchronized Set<Integer> getPartitionsToFailback() {
-        return new HashSet<>(partition2nodeMap.keySet());
-    }
-
-    public synchronized void 
addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
-        //if this is the first request
-        if (pendingRequests.size() == 0) {
-            state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
-        }
-        pendingRequests.put(msg.getRequestId(), msg);
-    }
-
-    public synchronized void markRequestCompleted(int requestId) {
-        pendingRequests.remove(requestId);
-        updateState();
-    }
-
-    private void updateState() {
-        if (pendingRequests.size() == 0) {
-            switch (state) {
-                case PREPARING:
-                case FAILED:
-                    state = FailbackPlanState.PENDING_ROLLBACK;
-                    break;
-                case PENDING_PARTICIPANT_REPONSE:
-                    state = FailbackPlanState.PENDING_COMPLETION;
-                    break;
-                default:
-                    break;
-            }
-        }
-    }
-
-    public synchronized Set<PreparePartitionsFailbackRequestMessage> 
getPlanFailbackRequests() {
-        Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new 
HashSet<>();
-        /**
-         * for each participant, construct a request with the partitions
-         * that will be failed back or flushed.
-         */
-        for (String participant : participants) {
-            Set<Integer> partitionToPrepareForFailback = new HashSet<>();
-            for (Map.Entry<Integer, String> entry : 
partition2nodeMap.entrySet()) {
-                if (entry.getValue().equals(participant)) {
-                    partitionToPrepareForFailback.add(entry.getKey());
-                }
-            }
-            PreparePartitionsFailbackRequestMessage msg = new 
PreparePartitionsFailbackRequestMessage(planId,
-                    requestId++, participant, partitionToPrepareForFailback);
-            if (participant.equals(nodeToReleaseMetadataManager)) {
-                msg.setReleaseMetadataNode(true);
-            }
-            node2Partitions.add(msg);
-        }
-        return node2Partitions;
-    }
-
-    public synchronized CompleteFailbackRequestMessage 
getCompleteFailbackRequestMessage() {
-        return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, 
getPartitionsToFailback());
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public long getPlanId() {
-        return planId;
-    }
-
-    public void setNodeToReleaseMetadataManager(String 
nodeToReleaseMetadataManager) {
-        this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
-    }
-
-    public synchronized FailbackPlanState getState() {
-        return state;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Failing back node: " + nodeId);
-        sb.append(" Participants: " + participants);
-        sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
deleted file mode 100644
index 985c741..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.rmi.RemoteException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class PreparePartitionsFailbackRequestMessage extends 
AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = 
Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName());
-    private final Set<Integer> partitions;
-    private boolean releaseMetadataNode = false;
-    private final String nodeID;
-
-    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, 
String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeID = nodeId;
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public boolean isReleaseMetadataNode() {
-        return releaseMetadataNode;
-    }
-
-    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
-        this.releaseMetadataNode = releaseMetadataNode;
-    }
-
-    public String getNodeID() {
-        return nodeID;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        
sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
-        sb.append(" Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        NodeControllerService ncs = (NodeControllerService) cs;
-        IAppRuntimeContext appContext =
-                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
-        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
-        /**
-         * if the metadata partition will be failed back
-         * we need to flush and close all datasets including metadata datasets
-         * otherwise we need to close all non-metadata datasets and flush 
metadata datasets
-         * so that their memory components will be copied to the failing back 
node
-         */
-        if (releaseMetadataNode) {
-            appContext.getDatasetLifecycleManager().closeAllDatasets();
-            //remove the metadata node stub from RMI registry
-            try {
-                appContext.unexportMetadataNodeStub();
-            } catch (RemoteException e) {
-                LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", 
e);
-                throw ExceptionUtils.convertToHyracksDataException(e);
-            }
-        } else {
-            //close all non-metadata datasets
-            appContext.getDatasetLifecycleManager().closeUserDatasets();
-            //flush the remaining metadata datasets that were not closed
-            appContext.getDatasetLifecycleManager().flushAllDatasets();
-        }
-
-        //mark the partitions to be closed as inactive
-        PersistentLocalResourceRepository localResourceRepo = 
(PersistentLocalResourceRepository) appContext
-                .getLocalResourceRepository();
-        for (Integer partitionId : partitions) {
-            localResourceRepo.addInactivePartition(partitionId);
-        }
-
-        //send response after partitions prepared for failback
-        PreparePartitionsFailbackResponseMessage reponse = new 
PreparePartitionsFailbackResponseMessage(planId,
-                requestId, partitions);
-        try {
-            broker.sendMessageToCC(reponse);
-        } catch (Exception e) {
-            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
deleted file mode 100644
index c655ecd..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.util.Set;
-
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class PreparePartitionsFailbackResponseMessage extends 
AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public PreparePartitionsFailbackResponseMessage(long planId, int 
requestId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        
ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
-    }
-
-    @Override
-    public String toString() {
-        return PreparePartitionsFailbackResponseMessage.class.getSimpleName() 
+ " " + partitions.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
deleted file mode 100644
index 1e67052..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class TakeoverMetadataNodeRequestMessage implements IApplicationMessage 
{
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = 
Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        NodeControllerService ncs = (NodeControllerService) cs;
-        IAppRuntimeContext appContext =
-                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
-        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
-        HyracksDataException hde = null;
-        try {
-            appContext.initializeMetadata(false);
-            appContext.exportMetadataNodeStub();
-        } catch (Exception e) {
-            LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
-            hde = new HyracksDataException(e);
-        } finally {
-            TakeoverMetadataNodeResponseMessage reponse = new 
TakeoverMetadataNodeResponseMessage(
-                    appContext.getTransactionSubsystem().getId());
-            try {
-                broker.sendMessageToCC(reponse);
-            } catch (Exception e) {
-                LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
-                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
-            }
-        }
-        if (hde != null) {
-            throw hde;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
deleted file mode 100644
index d8b2136..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class TakeoverMetadataNodeResponseMessage implements 
IApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final String nodeId;
-
-    public TakeoverMetadataNodeResponseMessage(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
-    }
-
-    @Override
-    public String toString() {
-        return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
deleted file mode 100644
index fb8a33b..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class TakeoverPartitionsRequestMessage implements IApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = 
Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
-    private final Integer[] partitions;
-    private final long requestId;
-    private final String nodeId;
-
-    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, 
Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
-        sb.append(" Request ID: " + requestId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: ");
-        for (Integer partitionId : partitions) {
-            sb.append(partitionId + ",");
-        }
-        //remove last comma
-        sb.charAt(sb.length() - 1);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        NodeControllerService ncs = (NodeControllerService) cs;
-        IAppRuntimeContext appContext =
-                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
-        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
-        //if the NC is shutting down, it should ignore takeover partitions 
request
-        if (!appContext.isShuttingdown()) {
-            HyracksDataException hde = null;
-            try {
-                IRemoteRecoveryManager remoteRecoeryManager = 
appContext.getRemoteRecoveryManager();
-                remoteRecoeryManager.takeoverPartitons(partitions);
-            } catch (IOException | ACIDException e) {
-                LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
-                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
-            } finally {
-                //send response after takeover is completed
-                TakeoverPartitionsResponseMessage reponse = new 
TakeoverPartitionsResponseMessage(requestId,
-                        appContext.getTransactionSubsystem().getId(), 
partitions);
-                try {
-                    broker.sendMessageToCC(reponse);
-                } catch (Exception e) {
-                    LOGGER.log(Level.SEVERE, "Failure taking over partitions", 
e);
-                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, 
e);
-                }
-            }
-            if (hde != null) {
-                throw hde;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
deleted file mode 100644
index 81492c2..0000000
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Integer[] partitions;
-    private final String nodeId;
-    private final long requestId;
-
-    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, 
Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-
-    @Override
-    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
-        ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
-    }
-
-    @Override
-    public String toString() {
-        return TakeoverPartitionsResponseMessage.class.getSimpleName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
index 355f503..6193931 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
@@ -18,33 +18,23 @@
  */
 package org.apache.asterix.runtime.utils;
 
-import java.io.IOException;
-import java.util.function.Supplier;
-import java.util.logging.Logger;
-
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
-import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.common.config.ExtensionProperties;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.config.FeedProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.PropertiesAccessor;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.config.*;
 import org.apache.asterix.common.dataflow.IApplicationContextInfo;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.common.IStorageManager;
 
+import java.io.IOException;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+
 /*
  * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager
  * instances that are accessed from the NCs. In addition an instance of 
ICCApplicationContext
@@ -71,13 +61,15 @@ public class AppContextInfo implements 
IApplicationContextInfo, IPropertiesProvi
     private IHyracksClientConnection hcc;
     private Object extensionManager;
     private volatile boolean initialized = false;
+    private IFaultToleranceStrategy ftStrategy;
 
     private AppContextInfo() {
     }
 
     public static synchronized void initialize(ICCApplicationContext ccAppCtx, 
IHyracksClientConnection hcc,
             ILibraryManager libraryManager, IResourceIdManager 
resourceIdManager,
-            Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager)
+            Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
+            IFaultToleranceStrategy ftStrategy)
             throws AsterixException, IOException {
         if (INSTANCE.initialized) {
             throw new AsterixException(AppContextInfo.class.getSimpleName() + 
" has been initialized already");
@@ -98,6 +90,7 @@ public class AppContextInfo implements 
IApplicationContextInfo, IPropertiesProvi
         INSTANCE.feedProperties = new FeedProperties(propertiesAccessor);
         INSTANCE.extensionProperties = new 
ExtensionProperties(propertiesAccessor);
         INSTANCE.replicationProperties = new 
ReplicationProperties(propertiesAccessor);
+        INSTANCE.ftStrategy = ftStrategy;
         INSTANCE.hcc = hcc;
         INSTANCE.buildProperties = new BuildProperties(propertiesAccessor);
         INSTANCE.messagingProperties = new 
MessagingProperties(propertiesAccessor);
@@ -205,4 +198,8 @@ public class AppContextInfo implements 
IApplicationContextInfo, IPropertiesProvi
     public IMetadataBootstrap getMetadataBootstrap() {
         return metadataBootstrapSupplier.get();
     }
+
+    public IFaultToleranceStrategy getFaultToleranceStrategy() {
+        return ftStrategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 8fffdb1..d975a98 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -19,50 +19,36 @@
 package org.apache.asterix.runtime.utils;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
-import 
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * A holder class for properties related to the Asterix cluster.
  */
 
-public class ClusterStateManager {
+public class ClusterStateManager implements IClusterStateManager {
     /*
      * TODO: currently after instance restarts we require all nodes to join 
again,
      * otherwise the cluster wont be ACTIVE. we may overcome this by storing 
the cluster state before the instance
@@ -71,9 +57,8 @@ public class ClusterStateManager {
 
     private static final Logger LOGGER = 
Logger.getLogger(ClusterStateManager.class.getName());
     public static final ClusterStateManager INSTANCE = new 
ClusterStateManager();
-    private static final String CLUSTER_NET_IP_ADDRESS_KEY = 
"cluster-net-ip-address";
     private static final String IO_DEVICES = "iodevices";
-    private Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
+    private final Map<String, Map<String, String>> activeNcConfiguration = new 
HashMap<>();
 
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
@@ -84,32 +69,21 @@ public class ClusterStateManager {
 
     private Map<String, ClusterPartition[]> node2PartitionsMap = null;
     private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
-    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = null;
 
-    private long clusterRequestId = 0;
     private String currentMetadataNode = null;
     private boolean metadataNodeActive = false;
-    private boolean autoFailover = false;
-    private boolean replicationEnabled = false;
     private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+    private IFaultToleranceStrategy ftStrategy;
 
     private ClusterStateManager() {
         cluster = ClusterProperties.INSTANCE.getCluster();
         // if this is the CC process
-        if (AppContextInfo.INSTANCE.initialized()
-                && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+        if (AppContextInfo.INSTANCE.initialized() && 
AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
             node2PartitionsMap = 
AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
             clusterPartitions = 
AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
             currentMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
-            replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
-            autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
-            if (autoFailover) {
-                pendingTakeoverRequests = new HashMap<>();
-                pendingProcessingFailbackPlans = new LinkedList<>();
-                planId2FailbackPlanMap = new HashMap<>();
-            }
+            ftStrategy = AppContextInfo.INSTANCE.getFaultToleranceStrategy();
+            ftStrategy.bindTo(this);
         }
     }
 
@@ -117,30 +91,8 @@ public class ClusterStateManager {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + 
nodeId);
         }
-        activeNcConfiguration.remove(nodeId);
-
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                notifyFailbackPlansNodeFailure(nodeId);
-                revertFailedFailbackPlanEffects();
-            }
-        } else {
-            //an active node failed
-            failedNodes.add(nodeId);
-            if (nodeId.equals(currentMetadataNode)) {
-                metadataNodeActive = false;
-                LOGGER.info("Metadata node is now inactive");
-            }
-            updateNodePartitions(nodeId, false);
-            if (replicationEnabled) {
-                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
-                if (autoFailover) {
-                    notifyFailbackPlansNodeFailure(nodeId);
-                    requestPartitionsTakeover(nodeId);
-                }
-            }
-        }
+        failedNodes.add(nodeId);
+        ftStrategy.notifyNodeFailure(nodeId);
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, 
String> configuration)
@@ -149,46 +101,51 @@ public class ClusterStateManager {
             LOGGER.info("Registering configuration parameters for node id " + 
nodeId);
         }
         activeNcConfiguration.put(nodeId, configuration);
+        failedNodes.remove(nodeId);
+        ftStrategy.notifyNodeJoin(nodeId);
+    }
 
-        //a node trying to come back after failure
-        if (failedNodes.contains(nodeId)) {
-            if (autoFailover) {
-                prepareFailbackPlan(nodeId);
-                return;
-            } else {
-                //a node completed local or remote recovery and rejoined
-                failedNodes.remove(nodeId);
-                if (replicationEnabled) {
-                    //notify other replica to reconnect to this node
-                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-                }
-            }
-        }
+    @Override
+    public synchronized void setState(ClusterState state) {
+        this.state = state;
+        LOGGER.info("Cluster State is now " + state.name());
+    }
 
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = true;
-            LOGGER.info("Metadata node is now active");
+    @Override
+    public void updateMetadataNode(String nodeId, boolean active) {
+        currentMetadataNode = nodeId;
+        metadataNodeActive = active;
+        if (active) {
+            LOGGER.info(String.format("Metadata node %s is now active", 
currentMetadataNode));
         }
-        updateNodePartitions(nodeId, true);
     }
 
-    private synchronized void updateNodePartitions(String nodeId, boolean 
added) throws HyracksDataException {
+    @Override
+    public synchronized void updateNodePartitions(String nodeId, boolean 
active) throws HyracksDataException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         // if this isn't a storage node, it will not have cluster partitions
         if (nodePartitions != null) {
             for (ClusterPartition p : nodePartitions) {
-                // set the active node for this node's partitions
-                p.setActive(added);
-                if (added) {
-                    p.setActiveNodeId(nodeId);
-                }
+                updateClusterPartition(p.getPartitionId(), nodeId, active);
             }
-            resetClusterPartitionConstraint();
-            updateClusterState();
         }
     }
 
-    private synchronized void updateClusterState() throws HyracksDataException 
{
+    @Override
+    public synchronized void updateClusterPartition(Integer partitionNum, 
String activeNode, boolean active) {
+        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionNum);
+        if (clusterPartition != null) {
+            // set the active node for this node's partitions
+            clusterPartition.setActive(active);
+            if (active) {
+                clusterPartition.setActiveNodeId(activeNode);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void refreshState() throws HyracksDataException {
+        resetClusterPartitionConstraint();
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 state = ClusterState.UNUSABLE;
@@ -196,20 +153,19 @@ public class ClusterStateManager {
                 return;
             }
         }
+
+        state = ClusterState.PENDING;
+        LOGGER.info("Cluster is now " + state);
+
         // if all storage partitions are active as well as the metadata node, 
then the cluster is active
         if (metadataNodeActive) {
-            state = ClusterState.PENDING;
-            LOGGER.info("Cluster is now " + state);
             AppContextInfo.INSTANCE.getMetadataBootstrap().init();
             state = ClusterState.ACTIVE;
             LOGGER.info("Cluster is now " + state);
+            // Notify any waiting threads for the cluster to be active.
+            notifyAll();
             // start global recovery
             
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
-                processPendingFailbackPlans();
-            }
-        } else {
-            requestMetadataNodeTakeover();
         }
     }
 
@@ -232,6 +188,7 @@ public class ClusterStateManager {
         return ncConfig.get(IO_DEVICES).split(",");
     }
 
+    @Override
     public ClusterState getState() {
         return state;
     }
@@ -287,6 +244,7 @@ public class ClusterStateManager {
         return 
AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
     }
 
+    @Override
     public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
         return node2PartitionsMap.get(nodeId);
     }
@@ -298,6 +256,7 @@ public class ClusterStateManager {
         return 0;
     }
 
+    @Override
     public synchronized ClusterPartition[] getClusterPartitons() {
         ArrayList<ClusterPartition> partitons = new ArrayList<>();
         for (ClusterPartition partition : clusterPartitions.values()) {
@@ -306,331 +265,6 @@ public class ClusterStateManager {
         return partitons.toArray(new ClusterPartition[] {});
     }
 
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
-        if (!lostPartitions.isEmpty()) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
-                        break;
-                    }
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            //For each replica, send a request to takeover the assigned 
partitions
-            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
-                String replica = entry.getKey();
-                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
-                } catch (Exception e) {
-                    /**
-                     * if we fail to send the request, it means the NC we 
tried to send the request to
-                     * has failed. When the failure notification arrives, we 
will send any pending request
-                     * that belongs to the failed NC to a different active 
replica.
-                     */
-                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
-                }
-            }
-        }
-    }
-
-    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
-            Map<String, List<Integer>> partitionRecoveryPlan) {
-        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
-            if (!partitionRecoveryPlan.containsKey(replica)) {
-                List<Integer> replicaPartitions = new ArrayList<>();
-                replicaPartitions.add(partition.getPartitionId());
-                partitionRecoveryPlan.put(replica, replicaPartitions);
-            } else {
-                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            if (partition.getActiveNodeId().equals(nodeId)) {
-                nodePartitions.add(partition);
-            }
-        }
-        /**
-         * if there is any pending takeover request that this node was 
supposed to handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitions.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getRequestId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = 
AppContextInfo.INSTANCE.getMetadataProperties()
-                .getMetadataPartition();
-        //request the metadataPartition node to register itself as the 
metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
-        } catch (Exception e) {
-            /**
-             * if we fail to send the request, it means the NC we tried to 
send the request to
-             * has failed. When the failure notification arrives, a new NC 
will be assigned to
-             * the metadata partition and a new metadata node takeover request 
will be sent to it.
-             */
-            LOGGER.log(Level.WARNING,
-                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
-        }
-    }
-
-    public synchronized void 
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
-            throws HyracksDataException {
-        for (Integer partitonId : response.getPartitions()) {
-            ClusterPartition partition = clusterPartitions.get(partitonId);
-            partition.setActive(true);
-            partition.setActiveNodeId(response.getNodeId());
-        }
-        pendingTakeoverRequests.remove(response.getRequestId());
-        resetClusterPartitionConstraint();
-        updateClusterState();
-    }
-
-    public synchronized void 
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage 
response)
-            throws HyracksDataException {
-        currentMetadataNode = response.getNodeId();
-        metadataNodeActive = true;
-        LOGGER.info("Current metadata node: " + currentMetadataNode);
-        updateClusterState();
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicationClients(failingBackNodeId);
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = 
node2PartitionsMap.get(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /**
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed 
back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        /**
-         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /**
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
-                        ClusterPartition clusterPartition = 
clusterPartitions.get(partitionId);
-                        clusterPartition.setActive(false);
-                        //partition expected to be returned to the failing 
back node
-                        clusterPartition.setActiveNodeId(failbackNode);
-                    }
-
-                    /**
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback 
completes
-                     */
-                    String originalMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties()
-                            .getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                    }
-
-                    //force new jobs to wait
-                    state = ClusterState.REBALANCING;
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE
-                            .getCCApplicationContext().getMessageBroker();
-                    handleFailbackRequests(plan, messageBroker);
-                    /**
-                     * wait until the current plan is completed before 
processing the next plan.
-                     * when the current one completes or is reverted, the 
cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
-                     */
-                    break;
-                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing 
to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
-        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
-        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
-                plan.addPendingRequest(request);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
-                plan.notifyNodeFailure(request.getNodeID());
-                revertFailedFailbackPlanEffects();
-                break;
-            }
-        }
-    }
-
-    public synchronized void 
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
 msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /**
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back 
node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                    .getMessageBroker();
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void 
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
-            throws HyracksDataException {
-        /**
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
-        updateNodePartitions(nodeId, true);
-    }
-
-    private synchronized void notifyImpactedReplicas(String nodeId, 
ClusterEventType event) {
-        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
-        Set<String> remoteReplicas = 
replicationProperties.getRemoteReplicasIds(nodeId);
-        String nodeIdAddress = "";
-        //in case the node joined with a new IP address, we need to send it to 
the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = 
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
-        }
-
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, 
nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
AppContextInfo.INSTANCE.getCCApplicationContext()
-                .getMessageBroker();
-        for (String replica : remoteReplicas) {
-            //if the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
-                }
-            }
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back 
to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            plan.notifyNodeFailure(nodeId);
-        }
-    }
-
     public synchronized boolean isMetadataNodeActive() {
         return metadataNodeActive;
     }
@@ -676,4 +310,14 @@ public class ClusterStateManager {
         stateDescription.putPOJO("partitions", clusterPartitions);
         return stateDescription;
     }
+
+    @Override
+    public Map<String, Map<String, String>> getActiveNcConfiguration() {
+        return Collections.unmodifiableMap(activeNcConfiguration);
+    }
+
+    @Override
+    public String getCurrentMetadataNodeId() {
+        return currentMetadataNode;
+    }
 }

Reply via email to