http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
new file mode 100644
index 0000000..e6638e8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.RemoteRecoveryTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import 
org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
+import 
org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.util.FaultToleranceUtil;
+import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MetadataNodeFaultToleranceStrategy implements 
IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = 
Logger.getLogger(MetadataNodeFaultToleranceStrategy.class.getName());
+    private IClusterStateManager clusterManager;
+    private String metadataNodeId;
+    private IReplicationStrategy replicationStrategy;
+    private ICCMessageBroker messageBroker;
+    private final Set<String> hotStandbyMetadataReplica = new HashSet<>();
+    private final Set<String> failedNodes = new HashSet<>();
+    private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+
+    @Override
+    public synchronized void notifyNodeJoin(String nodeId) throws 
HyracksDataException {
+        pendingStartupCompletionNodes.add(nodeId);
+    }
+
+    @Override
+    public synchronized void notifyNodeFailure(String nodeId) throws 
HyracksDataException {
+        failedNodes.add(nodeId);
+        hotStandbyMetadataReplica.remove(nodeId);
+        clusterManager.updateNodePartitions(nodeId, false);
+        if (nodeId.equals(metadataNodeId)) {
+            clusterManager.updateMetadataNode(metadataNodeId, false);
+        }
+        clusterManager.refreshState();
+        if (replicationStrategy.isParticipant(nodeId)) {
+            // Notify impacted replica
+            FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_FAILURE, clusterManager,
+                    messageBroker, replicationStrategy);
+        }
+        // If the failed node is the metadata node, ask its replicas to replay 
any committed jobs
+        if (nodeId.equals(metadataNodeId)) {
+            int metadataPartitionId = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition()
+                    .getPartitionId();
+            Set<Integer> metadataPartition = new 
HashSet<>(Arrays.asList(metadataPartitionId));
+            Set<Replica> activeRemoteReplicas = 
replicationStrategy.getRemoteReplicas(metadataNodeId).stream()
+                    .filter(replica -> 
!failedNodes.contains(replica.getId())).collect(Collectors.toSet());
+            //TODO Do election to identity the node with latest state
+            for (Replica replica : activeRemoteReplicas) {
+                ReplayPartitionLogsRequestMessage msg = new 
ReplayPartitionLogsRequestMessage(metadataPartition);
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, 
replica.getId());
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application 
message to an NC", e);
+                    continue;
+                }
+            }
+        }
+    }
+
+    @Override
+    public IFaultToleranceStrategy from(IReplicationStrategy 
replicationStrategy, ICCMessageBroker messageBroker) {
+        MetadataNodeFaultToleranceStrategy ft = new 
MetadataNodeFaultToleranceStrategy();
+        ft.replicationStrategy = replicationStrategy;
+        ft.messageBroker = messageBroker;
+        return ft;
+    }
+
+    @Override
+    public synchronized void process(INCLifecycleMessage message) throws 
HyracksDataException {
+        switch (message.getType()) {
+            case STARTUP_TASK_REQUEST:
+                process((StartupTaskRequestMessage) message);
+                break;
+            case STARTUP_TASK_RESULT:
+                process((NCLifecycleTaskReportMessage) message);
+                break;
+            case REPLAY_LOGS_RESPONSE:
+                process((ReplayPartitionLogsResponseMessage) message);
+                break;
+            default:
+                throw new HyracksDataException("Unsupported message type: " + 
message.getType().name());
+        }
+    }
+
+    @Override
+    public synchronized void bindTo(IClusterStateManager clusterManager) {
+        this.clusterManager = clusterManager;
+        this.metadataNodeId = clusterManager.getCurrentMetadataNodeId();
+    }
+
+    private synchronized void process(ReplayPartitionLogsResponseMessage msg) {
+        hotStandbyMetadataReplica.add(msg.getNodeId());
+        LOGGER.log(Level.INFO, "Hot Standby Metadata Replicas: " + 
hotStandbyMetadataReplica);
+    }
+
+    private synchronized void process(StartupTaskRequestMessage msg) throws 
HyracksDataException {
+        final String nodeId = msg.getNodeId();
+        final SystemState state = msg.getState();
+        final boolean isParticipant = 
replicationStrategy.isParticipant(nodeId);
+        List<INCLifecycleTask> tasks;
+        if (!isParticipant) {
+            tasks = buildNonParticipantStartupSequence(nodeId, state);
+        } else {
+            tasks = buildParticipantStartupSequence(nodeId, state);
+        }
+        StartupTaskResponseMessage response = new 
StartupTaskResponseMessage(nodeId, tasks);
+        try {
+            messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    private synchronized void process(NCLifecycleTaskReportMessage msg) throws 
HyracksDataException {
+        final String nodeId = msg.getNodeId();
+        pendingStartupCompletionNodes.remove(nodeId);
+        if (msg.isSuccess()) {
+            // If this node failed and recovered, notify impacted replicas to 
reconnect to it
+            if (replicationStrategy.isParticipant(nodeId) && 
failedNodes.remove(nodeId)) {
+                FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_JOIN, clusterManager,
+                        messageBroker, replicationStrategy);
+            }
+            clusterManager.updateNodePartitions(msg.getNodeId(), true);
+            if (msg.getNodeId().equals(metadataNodeId)) {
+                clusterManager.updateMetadataNode(metadataNodeId, true);
+                // When metadata node is active, it is the only hot standby 
replica
+                hotStandbyMetadataReplica.clear();
+                hotStandbyMetadataReplica.add(metadataNodeId);
+            }
+            clusterManager.refreshState();
+        } else {
+            LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete 
startup. ", msg.getException());
+        }
+    }
+
+    private List<INCLifecycleTask> buildNonParticipantStartupSequence(String 
nodeId, SystemState state) {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        if (state == SystemState.CORRUPTED) {
+            //need to perform local recovery for node partitions
+            LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+                    
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            tasks.add(rt);
+        }
+        tasks.add(new ExternalLibrarySetupTask(false));
+        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new CheckpointTask());
+        tasks.add(new StartLifecycleComponentsTask());
+        return tasks;
+    }
+
+    private List<INCLifecycleTask> buildParticipantStartupSequence(String 
nodeId, SystemState state) {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        switch (state) {
+            case NEW_UNIVERSE:
+                // If the metadata node (or replica) failed and lost its data
+                // => Metadata Remote Recovery from standby replica
+                tasks.add(getMetadataPartitionRecoveryPlan());
+                // Checkpoint after remote recovery to move node to HEALTHY 
state
+                tasks.add(new CheckpointTask());
+                break;
+            case CORRUPTED:
+                // If the metadata node (or replica) failed and started again 
without losing data => Local Recovery
+                LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+                        
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+                tasks.add(rt);
+                break;
+            case INITIAL_RUN:
+            case HEALTHY:
+            case RECOVERING:
+                break;
+            default:
+                break;
+        }
+        tasks.add(new StartReplicationServiceTask());
+        final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+        if (isMetadataNode) {
+            tasks.add(new MetadataBootstrapTask());
+        }
+        tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new CheckpointTask());
+        tasks.add(new StartLifecycleComponentsTask());
+        if (isMetadataNode) {
+            tasks.add(new BindMetadataNodeTask(true));
+        }
+        return tasks;
+    }
+
+    private RemoteRecoveryTask getMetadataPartitionRecoveryPlan() {
+        if (hotStandbyMetadataReplica.isEmpty()) {
+            throw new IllegalStateException("No metadata replicas to recover 
from");
+        }
+        // Construct recovery plan: Node => Set of partitions to recover from 
it
+        Map<String, Set<Integer>> recoveryPlan = new HashMap<>();
+        // Recover metadata partition from any metadata hot standby replica
+        int metadataPartitionId = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition()
+                .getPartitionId();
+        Set<Integer> metadataPartition = new 
HashSet<>(Arrays.asList(metadataPartitionId));
+        recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), 
metadataPartition);
+        return new RemoteRecoveryTask(recoveryPlan);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
new file mode 100644
index 0000000..0ee4f6a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = 
Logger.getLogger(NoFaultToleranceStrategy.class.getName());
+    IClusterStateManager clusterManager;
+    private String metadataNodeId;
+    private ICCMessageBroker messageBroker;
+    private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        pendingStartupCompletionNodes.add(nodeId);
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        pendingStartupCompletionNodes.remove(nodeId);
+        clusterManager.updateNodePartitions(nodeId, false);
+        if (nodeId.equals(metadataNodeId)) {
+            clusterManager.updateMetadataNode(metadataNodeId, false);
+        }
+        clusterManager.refreshState();
+    }
+
+    @Override
+    public void process(INCLifecycleMessage message) throws 
HyracksDataException {
+        switch (message.getType()) {
+            case STARTUP_TASK_REQUEST:
+                process((StartupTaskRequestMessage) message);
+                break;
+            case STARTUP_TASK_RESULT:
+                process((NCLifecycleTaskReportMessage) message);
+                break;
+            default:
+                throw new HyracksDataException("Unsupported message type: " + 
message.getType().name());
+        }
+    }
+
+    @Override
+    public IFaultToleranceStrategy from(IReplicationStrategy 
replicationStrategy, ICCMessageBroker messageBroker) {
+        NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy();
+        ft.messageBroker = messageBroker;
+        return ft;
+    }
+
+    @Override
+    public void bindTo(IClusterStateManager clusterManager) {
+        this.clusterManager = clusterManager;
+        metadataNodeId = clusterManager.getCurrentMetadataNodeId();
+    }
+
+    private void process(StartupTaskRequestMessage msg) throws 
HyracksDataException {
+        final String nodeId = msg.getNodeId();
+        List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), 
msg.getState());
+        StartupTaskResponseMessage response = new 
StartupTaskResponseMessage(nodeId, tasks);
+        try {
+            messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    private void process(NCLifecycleTaskReportMessage msg) throws 
HyracksDataException {
+        pendingStartupCompletionNodes.remove(msg.getNodeId());
+        if (msg.isSuccess()) {
+            clusterManager.updateNodePartitions(msg.getNodeId(), true);
+            if (msg.getNodeId().equals(metadataNodeId)) {
+                clusterManager.updateMetadataNode(metadataNodeId, true);
+            }
+            clusterManager.refreshState();
+        } else {
+            LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete 
startup. ", msg.getException());
+        }
+    }
+
+    private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, 
SystemState state) {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        if (state == SystemState.CORRUPTED) {
+            //need to perform local recovery for node partitions
+            LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+                    
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            tasks.add(rt);
+        }
+        final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+        if (isMetadataNode) {
+            tasks.add(new MetadataBootstrapTask());
+        }
+        tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new CheckpointTask());
+        tasks.add(new StartLifecycleComponentsTask());
+        if (isMetadataNode) {
+            tasks.add(new BindMetadataNodeTask(true));
+        }
+        return tasks;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
new file mode 100644
index 0000000..1abc3f0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import 
org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
+import 
org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
+
+public class NodeFailbackPlan {
+
+    public enum FailbackPlanState {
+        /**
+         * Initial state while selecting the nodes that will participate
+         * in the node failback plan.
+         */
+        PREPARING,
+        /**
+         * Once a pending {@link PreparePartitionsFailbackRequestMessage} 
request is added,
+         * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE 
to indicate
+         * a response is expected and need to wait for it.
+         */
+        PENDING_PARTICIPANT_REPONSE,
+        /**
+         * Upon receiving the last {@link 
PreparePartitionsFailbackResponseMessage} response,
+         * the state changes from PENDING_PARTICIPANT_REPONSE to 
PENDING_COMPLETION to indicate
+         * the need to send {@link CompleteFailbackRequestMessage} to the 
failing back node.
+         */
+        PENDING_COMPLETION,
+        /**
+         * if any of the participants fail or the failing back node itself 
fails during
+         * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, 
PENDING_COMPLETION),
+         * the state is changed to FAILED.
+         */
+        FAILED,
+        /**
+         * if the state is FAILED, and all pending responses (if any) have 
been received,
+         * the state changes from FAILED to PENDING_ROLLBACK to indicate the 
need to revert
+         * the effects of this plan (if any).
+         */
+        PENDING_ROLLBACK
+    }
+
+    private static long planIdGenerator = 0;
+    private long planId;
+    private final String nodeId;
+    private final Set<String> participants;
+    private final Map<Integer, String> partition2nodeMap;
+    private String nodeToReleaseMetadataManager;
+    private int requestId;
+    private Map<Integer, PreparePartitionsFailbackRequestMessage> 
pendingRequests;
+    private FailbackPlanState state;
+
+    public static NodeFailbackPlan createPlan(String nodeId) {
+        return new NodeFailbackPlan(planIdGenerator++, nodeId);
+    }
+
+    private NodeFailbackPlan(long planId, String nodeId) {
+        this.planId = planId;
+        this.nodeId = nodeId;
+        participants = new HashSet<>();
+        partition2nodeMap = new HashMap<>();
+        pendingRequests = new HashMap<>();
+        state = FailbackPlanState.PREPARING;
+    }
+
+    public synchronized void addPartitionToFailback(int partitionId, String 
currentActiveNode) {
+        partition2nodeMap.put(partitionId, currentActiveNode);
+    }
+
+    public synchronized void addParticipant(String nodeId) {
+        participants.add(nodeId);
+    }
+
+    public synchronized void notifyNodeFailure(String failedNode) {
+        if (participants.contains(failedNode)) {
+            if (state == FailbackPlanState.PREPARING) {
+                state = FailbackPlanState.FAILED;
+            } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) 
{
+                /**
+                 * if there is any pending request from this failed node,
+                 * it should be marked as completed and the plan should be 
marked as failed
+                 */
+                Set<Integer> failedRequests = new HashSet<>();
+                for (PreparePartitionsFailbackRequestMessage request : 
pendingRequests.values()) {
+                    if (request.getNodeID().equals(failedNode)) {
+                        failedRequests.add(request.getRequestId());
+                    }
+                }
+
+                if (!failedRequests.isEmpty()) {
+                    state = FailbackPlanState.FAILED;
+                    for (Integer failedRequestId : failedRequests) {
+                        markRequestCompleted(failedRequestId);
+                    }
+                }
+            }
+        } else if (nodeId.equals(failedNode)) {
+            //if the failing back node is the failed node itself
+            state = FailbackPlanState.FAILED;
+            updateState();
+        }
+    }
+
+    public synchronized Set<Integer> getPartitionsToFailback() {
+        return new HashSet<>(partition2nodeMap.keySet());
+    }
+
+    public synchronized void 
addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
+        //if this is the first request
+        if (pendingRequests.size() == 0) {
+            state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
+        }
+        pendingRequests.put(msg.getRequestId(), msg);
+    }
+
+    public synchronized void markRequestCompleted(int requestId) {
+        pendingRequests.remove(requestId);
+        updateState();
+    }
+
+    private void updateState() {
+        if (pendingRequests.size() == 0) {
+            switch (state) {
+                case PREPARING:
+                case FAILED:
+                    state = FailbackPlanState.PENDING_ROLLBACK;
+                    break;
+                case PENDING_PARTICIPANT_REPONSE:
+                    state = FailbackPlanState.PENDING_COMPLETION;
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    public synchronized Set<PreparePartitionsFailbackRequestMessage> 
getPlanFailbackRequests() {
+        Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new 
HashSet<>();
+        /**
+         * for each participant, construct a request with the partitions
+         * that will be failed back or flushed.
+         */
+        for (String participant : participants) {
+            Set<Integer> partitionToPrepareForFailback = new HashSet<>();
+            for (Map.Entry<Integer, String> entry : 
partition2nodeMap.entrySet()) {
+                if (entry.getValue().equals(participant)) {
+                    partitionToPrepareForFailback.add(entry.getKey());
+                }
+            }
+            PreparePartitionsFailbackRequestMessage msg = new 
PreparePartitionsFailbackRequestMessage(planId,
+                    requestId++, participant, partitionToPrepareForFailback);
+            if (participant.equals(nodeToReleaseMetadataManager)) {
+                msg.setReleaseMetadataNode(true);
+            }
+            node2Partitions.add(msg);
+        }
+        return node2Partitions;
+    }
+
+    public synchronized CompleteFailbackRequestMessage 
getCompleteFailbackRequestMessage() {
+        return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, 
getPartitionsToFailback());
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getPlanId() {
+        return planId;
+    }
+
+    public void setNodeToReleaseMetadataManager(String 
nodeToReleaseMetadataManager) {
+        this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
+    }
+
+    public synchronized FailbackPlanState getState() {
+        return state;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Failing back node: " + nodeId);
+        sb.append(" Participants: " + participants);
+        sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
new file mode 100644
index 0000000..2d423f9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CompleteFailbackRequestMessage extends 
AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(CompleteFailbackRequestMessage.class.getName());
+    private final Set<Integer> partitions;
+    private final String nodeId;
+
+    public CompleteFailbackRequestMessage(long planId, int requestId, String 
nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeId = nodeId;
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext =
+                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
+        HyracksDataException hde = null;
+        try {
+            IRemoteRecoveryManager remoteRecoeryManager = 
appContext.getRemoteRecoveryManager();
+            remoteRecoeryManager.completeFailbackProcess();
+        } catch (IOException | InterruptedException e) {
+            LOGGER.log(Level.SEVERE, "Failure during completion of failback 
process", e);
+            hde = ExceptionUtils.convertToHyracksDataException(e);
+        } finally {
+            CompleteFailbackResponseMessage reponse = new 
CompleteFailbackResponseMessage(planId,
+                    requestId, partitions);
+            try {
+                broker.sendMessageToCC(reponse);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            }
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.COMPLETE_FAILBACK_REQUEST;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
new file mode 100644
index 0000000..fb45892
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+import java.util.Set;
+
+public class CompleteFailbackResponseMessage extends 
AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public CompleteFailbackResponseMessage(long planId, int requestId, 
Set<Integer> partitions) {
+        super(planId, requestId);
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.COMPLETE_FAILBACK_RESPONSE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
new file mode 100644
index 0000000..3af075e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class NCLifecycleTaskReportMessage implements INCLifecycleMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final boolean success;
+    private Exception exception;
+
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success) {
+        this.nodeId = nodeId;
+        this.success = success;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.STARTUP_TASK_RESULT;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
new file mode 100644
index 0000000..2104f9c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.rmi.RemoteException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class PreparePartitionsFailbackRequestMessage extends 
AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName());
+    private final Set<Integer> partitions;
+    private boolean releaseMetadataNode = false;
+    private final String nodeID;
+
+    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, 
String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeID = nodeId;
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public boolean isReleaseMetadataNode() {
+        return releaseMetadataNode;
+    }
+
+    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
+        this.releaseMetadataNode = releaseMetadataNode;
+    }
+
+    public String getNodeID() {
+        return nodeID;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        
sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext =
+                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
+        /**
+         * if the metadata partition will be failed back
+         * we need to flush and close all datasets including metadata datasets
+         * otherwise we need to close all non-metadata datasets and flush 
metadata datasets
+         * so that their memory components will be copied to the failing back 
node
+         */
+        if (releaseMetadataNode) {
+            appContext.getDatasetLifecycleManager().closeAllDatasets();
+            //remove the metadata node stub from RMI registry
+            try {
+                appContext.unexportMetadataNodeStub();
+            } catch (RemoteException e) {
+                LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", 
e);
+                throw ExceptionUtils.convertToHyracksDataException(e);
+            }
+        } else {
+            //close all non-metadata datasets
+            appContext.getDatasetLifecycleManager().closeUserDatasets();
+            //flush the remaining metadata datasets that were not closed
+            appContext.getDatasetLifecycleManager().flushAllDatasets();
+        }
+
+        //mark the partitions to be closed as inactive
+        PersistentLocalResourceRepository localResourceRepo = 
(PersistentLocalResourceRepository) appContext
+                .getLocalResourceRepository();
+        for (Integer partitionId : partitions) {
+            localResourceRepo.addInactivePartition(partitionId);
+        }
+
+        //send response after partitions prepared for failback
+        PreparePartitionsFailbackResponseMessage reponse = new 
PreparePartitionsFailbackResponseMessage(planId,
+                requestId, partitions);
+        try {
+            broker.sendMessageToCC(reponse);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.PREPARE_FAILBACK_REQUEST;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
new file mode 100644
index 0000000..e02cd42
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+import java.util.Set;
+
+public class PreparePartitionsFailbackResponseMessage extends 
AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public PreparePartitionsFailbackResponseMessage(long planId, int 
requestId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    @Override
+    public String toString() {
+        return PreparePartitionsFailbackResponseMessage.class.getSimpleName() 
+ " " + partitions.toString();
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.PREPARE_FAILBACK_RESPONSE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
new file mode 100644
index 0000000..96ae8be
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ReplayPartitionLogsRequestMessage.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public ReplayPartitionLogsRequestMessage(Set<Integer> partitions) {
+        this.partitions = partitions;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        // Replay the logs for these partitions and flush any impacted dataset
+        
appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, 
true);
+
+        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
+        ReplayPartitionLogsResponseMessage reponse = new 
ReplayPartitionLogsResponseMessage(ncs.getId(), partitions);
+        try {
+            broker.sendMessageToCC(reponse);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.REPLAY_LOGS_REQUEST;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
new file mode 100644
index 0000000..dc19735
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+import java.util.Set;
+
+public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage 
{
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+    private final String nodeId;
+
+    public ReplayPartitionLogsResponseMessage(String nodeId, Set<Integer> 
partitions) {
+        this.partitions = partitions;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.REPLAY_LOGS_RESPONSE;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
new file mode 100644
index 0000000..be42a9d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class StartupTaskRequestMessage implements INCLifecycleMessage {
+
+    private static final Logger LOGGER = 
Logger.getLogger(StartupTaskRequestMessage.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final SystemState state;
+    private final String nodeId;
+
+    public StartupTaskRequestMessage(String nodeId, SystemState state) {
+        this.state = state;
+        this.nodeId = nodeId;
+    }
+
+    public static void send(NodeControllerService cs, SystemState systemState) 
throws HyracksDataException {
+        try {
+            StartupTaskRequestMessage msg = new 
StartupTaskRequestMessage(cs.getId(), systemState);
+            ((INCMessageBroker) 
cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage 
to CC", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    public SystemState getState() {
+        return state;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.STARTUP_TASK_REQUEST;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
new file mode 100644
index 0000000..6a72776
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartupTaskResponseMessage implements INCLifecycleMessage {
+
+    private static final Logger LOGGER = 
Logger.getLogger(StartupTaskResponseMessage.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final List<INCLifecycleTask> tasks;
+
+    public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> 
tasks) {
+        this.nodeId = nodeId;
+        this.tasks = tasks;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
+        boolean success = true;
+        HyracksDataException exception = null;
+        try {
+            for (INCLifecycleTask task : tasks) {
+                task.perform(cs);
+            }
+        } catch (HyracksDataException e) {
+            success = false;
+            exception = e;
+        }
+        NCLifecycleTaskReportMessage result = new 
NCLifecycleTaskReportMessage(nodeId, success);
+        result.setException(exception);
+        try {
+            broker.sendMessageToCC(result);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.STARTUP_TASK_RESPONSE;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..8ce12dd
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage 
{
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext =
+                (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
+        HyracksDataException hde = null;
+        try {
+            appContext.initializeMetadata(false);
+            appContext.exportMetadataNodeStub();
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+            hde = new HyracksDataException(e);
+        } finally {
+            TakeoverMetadataNodeResponseMessage reponse = new 
TakeoverMetadataNodeResponseMessage(
+                    appContext.getTransactionSubsystem().getId());
+            try {
+                broker.sendMessageToCC(reponse);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            }
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.TAKEOVER_METADATA_NODE_REQUEST;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..428047c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class TakeoverMetadataNodeResponseMessage implements 
INCLifecycleMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+
+    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    @Override
+    public String toString() {
+        return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..4e415de
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
+    private final Integer[] partitions;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, 
Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
+        sb.append(" Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) 
ncs.getApplicationContext().getMessageBroker();
+        //if the NC is shutting down, it should ignore takeover partitions 
request
+        if (!appContext.isShuttingdown()) {
+            HyracksDataException hde = null;
+            try {
+                IRemoteRecoveryManager remoteRecoeryManager = 
appContext.getRemoteRecoveryManager();
+                remoteRecoeryManager.takeoverPartitons(partitions);
+            } catch (IOException | ACIDException e) {
+                LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            } finally {
+                //send response after takeover is completed
+                TakeoverPartitionsResponseMessage reponse = new 
TakeoverPartitionsResponseMessage(requestId,
+                        appContext.getTransactionSubsystem().getId(), 
partitions);
+                try {
+                    broker.sendMessageToCC(reponse);
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Failure taking over partitions", 
e);
+                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, 
e);
+                }
+            }
+            if (hde != null) {
+                throw hde;
+            }
+        }
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.TAKEOVER_PARTITION_REQUEST;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..e653a64
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class TakeoverPartitionsResponseMessage implements INCLifecycleMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String nodeId;
+    private final long requestId;
+
+    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, 
Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException, 
InterruptedException {
+        AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+    }
+
+    @Override
+    public String toString() {
+        return TakeoverPartitionsResponseMessage.class.getSimpleName();
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.TAKEOVER_PARTITION_RESPONSE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 62eb250..f3182cf 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -18,41 +18,23 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 import org.apache.asterix.active.ActiveLifecycleListener;
-import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.api.http.server.ClusterApiServlet;
-import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.ConnectorApiServlet;
-import org.apache.asterix.api.http.server.DdlApiServlet;
-import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
-import org.apache.asterix.api.http.server.FeedServlet;
-import org.apache.asterix.api.http.server.FullApiServlet;
-import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.QueryApiServlet;
-import org.apache.asterix.api.http.server.QueryResultApiServlet;
-import org.apache.asterix.api.http.server.QueryServiceServlet;
-import org.apache.asterix.api.http.server.QueryStatusApiServlet;
-import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
-import org.apache.asterix.api.http.server.ShutdownApiServlet;
-import org.apache.asterix.api.http.server.UpdateApiServlet;
-import org.apache.asterix.api.http.server.VersionApiServlet;
+import org.apache.asterix.api.http.server.*;
 import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.file.StorageComponentProvider;
@@ -70,13 +52,19 @@ import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
-import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
 
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
+import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
 public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     private static final Logger LOGGER = 
Logger.getLogger(CCApplicationEntryPoint.class.getName());
@@ -90,7 +78,7 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws 
Exception {
         final ClusterControllerService controllerService = 
(ClusterControllerService) ccAppCtx.getControllerService();
-        IMessageBroker messageBroker = new CCMessageBroker(controllerService);
+        ICCMessageBroker messageBroker = new 
CCMessageBroker(controllerService);
         this.appCtx = ccAppCtx;
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -100,11 +88,14 @@ public class CCApplicationEntryPoint implements 
ICCApplicationEntryPoint {
         appCtx.setThreadFactory(new 
AsterixThreadFactory(appCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ResourceIdManager resourceIdManager = new ResourceIdManager();
+        IReplicationStrategy repStrategy = 
ClusterProperties.INSTANCE.getReplicationStrategy();
+        IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory
+                .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, 
messageBroker);
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager.instantiate((HyracksConnection) 
getNewHyracksClientConnection(), componentProvider);
         AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), 
libraryManager, resourceIdManager,
-                () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance());
+                () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance(), ftStrategy);
         ccExtensionManager = new CCExtensionManager(getExtensions());
         AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();

Reply via email to