Introduce Strategy Based Replication and Fault-Tolerance

This change includes the following:
- Introduce new APIs for Replication and Fault-Tolerance Strategies.
- Add configuration in cluster description file for high-availability.
- Add built-in replication strategies (Metadata_Only, Chained_Declustering)
- Add built-in fault-tolerance strategies (Auto, Metadata_Node)
- Remove none cluster state functionality from ClusterStateManager.
- Add customizable NC startup sequence

Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1405
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ef173f34
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ef173f34
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ef173f34

Branch: refs/heads/master
Commit: ef173f34f0b26627a6a99ed350becf0f6c1b8395
Parents: fff200c
Author: Murtadha Hubail <[email protected]>
Authored: Sun Feb 19 20:27:29 2017 +0300
Committer: abdullah alamoudi <[email protected]>
Committed: Sun Feb 19 12:29:35 2017 -0800

----------------------------------------------------------------------
 .../common/AsterixHyracksIntegrationUtil.java   |  34 +-
 .../asterix/app/nc/NCAppRuntimeContext.java     |  10 +-
 .../apache/asterix/app/nc/RecoveryManager.java  |  44 +-
 .../asterix/app/nc/TransactionSubsystem.java    |  18 +-
 .../app/nc/task/BindMetadataNodeTask.java       |  51 ++
 .../asterix/app/nc/task/CheckpointTask.java     |  39 ++
 .../app/nc/task/ExternalLibrarySetupTask.java   |  48 ++
 .../asterix/app/nc/task/LocalRecoveryTask.java  |  51 ++
 .../app/nc/task/MetadataBootstrapTask.java      |  44 ++
 .../asterix/app/nc/task/RemoteRecoveryTask.java |  54 ++
 .../app/nc/task/ReportMaxResourceIdTask.java    |  35 ++
 .../asterix/app/nc/task/StartFailbackTask.java  |  37 ++
 .../nc/task/StartLifecycleComponentsTask.java   |  67 +++
 .../nc/task/StartReplicationServiceTask.java    |  49 ++
 .../replication/AutoFaultToleranceStrategy.java | 526 +++++++++++++++++++
 .../FaultToleranceStrategyFactory.java          |  66 +++
 .../MetadataNodeFaultToleranceStrategy.java     | 253 +++++++++
 .../replication/NoFaultToleranceStrategy.java   | 146 +++++
 .../app/replication/NodeFailbackPlan.java       | 209 ++++++++
 .../message/CompleteFailbackRequestMessage.java |  98 ++++
 .../CompleteFailbackResponseMessage.java        |  60 +++
 .../message/NCLifecycleTaskReportMessage.java   |  63 +++
 ...PreparePartitionsFailbackRequestMessage.java | 125 +++++
 ...reparePartitionsFailbackResponseMessage.java |  56 ++
 .../ReplayPartitionLogsRequestMessage.java      |  64 +++
 .../ReplayPartitionLogsResponseMessage.java     |  56 ++
 .../message/StartupTaskRequestMessage.java      |  72 +++
 .../message/StartupTaskResponseMessage.java     |  77 +++
 .../TakeoverMetadataNodeRequestMessage.java     |  74 +++
 .../TakeoverMetadataNodeResponseMessage.java    |  53 ++
 .../TakeoverPartitionsRequestMessage.java       | 111 ++++
 .../TakeoverPartitionsResponseMessage.java      |  65 +++
 .../bootstrap/CCApplicationEntryPoint.java      |  45 +-
 .../bootstrap/NCApplicationEntryPoint.java      | 177 ++-----
 .../apache/asterix/util/FaultToleranceUtil.java |  67 +++
 .../asterix-app/src/main/resources/cluster.xml  |  20 +-
 .../results/api/replication/replication.1.adm   |   4 +-
 .../common/api/IDatasetLifecycleManager.java    |   9 +
 .../asterix/common/api/INCLifecycleTask.java    |  36 ++
 .../common/cluster/IClusterStateManager.java    |  85 +++
 .../common/config/ClusterProperties.java        |  55 +-
 .../common/config/ReplicationProperties.java    | 208 ++------
 .../common/context/DatasetLifecycleManager.java |  10 +
 .../ChainedDeclusteringReplicationStrategy.java |  85 +++
 .../replication/IFaultToleranceStrategy.java    |  67 +++
 .../common/replication/INCLifecycleMessage.java |  45 ++
 .../replication/IRemoteRecoveryManager.java     |  22 +
 .../common/replication/IReplicationManager.java |   6 +-
 .../replication/IReplicationStrategy.java       |  58 ++
 .../MetadataOnlyReplicationStrategy.java        |  90 ++++
 .../replication/NoReplicationStrategy.java      |  52 ++
 .../asterix/common/replication/Replica.java     |   6 +-
 .../replication/ReplicationStrategyFactory.java |  62 +++
 .../common/storage/IndexFileProperties.java     |  74 +++
 .../common/transactions/IRecoveryManager.java   |   3 +
 .../asterix/common/utils/StoragePathUtil.java   |  11 +
 .../src/main/resources/schema/cluster.xsd       |  36 +-
 .../apache/asterix/test/aql/TestExecutor.java   |  16 +-
 .../installer/command/ConfigureCommand.java     |   3 +-
 .../installer/command/ValidateCommand.java      |  36 +-
 .../local/local_chained_declustering_rep.xml    |  81 +++
 .../clusters/local/local_metadata_only_rep.xml  |  83 +++
 .../clusters/local/local_with_replication.xml   |  76 ---
 .../test/AsterixInstallerIntegrationUtil.java   |   6 +-
 .../installer/test/MetadataReplicationIT.java   | 118 +++++
 .../asterix/installer/test/ReplicationIT.java   |   2 +-
 .../metadata_node_recovery.1.ddl.aql            |  50 ++
 .../metadata_node_recovery.10.node.aql          |  30 ++
 .../metadata_node_recovery.11.sleep.aql         |   1 +
 .../metadata_node_recovery.12.mgx.aql           |   1 +
 .../metadata_node_recovery.13.sleep.aql         |   1 +
 .../metadata_node_recovery.14.ddl.aql           |  33 ++
 .../metadata_node_recovery.15.node.aql          |  30 ++
 .../metadata_node_recovery.16.sleep.aql         |   1 +
 .../metadata_node_recovery.17.mgx.aql           |   1 +
 .../metadata_node_recovery.18.sleep.aql         |   1 +
 .../metadata_node_recovery.19.query.aql         |  35 ++
 .../metadata_node_recovery.2.node.aql           |  30 ++
 .../metadata_node_recovery.3.sleep.aql          |   1 +
 .../metadata_node_recovery.4.get.http           |  30 ++
 .../metadata_node_recovery.5.mgx.aql            |   1 +
 .../metadata_node_recovery.6.sleep.aql          |   1 +
 .../metadata_node_recovery.7.get.http           |  30 ++
 .../metadata_node_recovery.8.query.aql          |  34 ++
 .../metadata_node_recovery.9.ddl.aql            |  33 ++
 .../metadata_node_recovery.cluster_state.4.adm  |  34 ++
 .../metadata_node_recovery.cluster_state.7.adm  |  34 ++
 .../metadata_node_recovery.query.19.adm         |   1 +
 .../metadata_node_recovery.query.8.adm          |   1 +
 .../metadata_only_replication/testsuite.xml     |  27 +
 .../functions/ReplicaFilesRequest.java          |  24 +-
 .../management/ReplicationChannel.java          |  92 ++--
 .../management/ReplicationManager.java          |  84 ++-
 .../recovery/RemoteRecoveryManager.java         | 108 +++-
 .../storage/ReplicaResourcesManager.java        |   4 +-
 .../message/AbstractFailbackPlanMessage.java    |   4 +-
 .../message/CompleteFailbackRequestMessage.java |  92 ----
 .../CompleteFailbackResponseMessage.java        |  54 --
 .../runtime/message/NodeFailbackPlan.java       | 206 --------
 ...PreparePartitionsFailbackRequestMessage.java | 119 -----
 ...reparePartitionsFailbackResponseMessage.java |  50 --
 .../TakeoverMetadataNodeRequestMessage.java     |  69 ---
 .../TakeoverMetadataNodeResponseMessage.java    |  48 --
 .../TakeoverPartitionsRequestMessage.java       | 107 ----
 .../TakeoverPartitionsResponseMessage.java      |  60 ---
 .../asterix/runtime/utils/AppContextInfo.java   |  31 +-
 .../runtime/utils/ClusterStateManager.java      | 484 +++--------------
 .../PersistentLocalResourceRepository.java      |  35 +-
 .../logging/LogManagerWithReplication.java      |  35 +-
 .../aoya/test/AsterixYARNInstanceUtil.java      |   9 +
 .../aoya/test/AsterixYARNLibraryTestIT.java     |   1 +
 .../aoya/test/AsterixYARNLifecycleIT.java       |   1 +
 112 files changed, 4782 insertions(+), 1885 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 83233f1..54804a1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -18,23 +18,13 @@
  */
 package org.apache.asterix.api.common;
 
-import static 
org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -46,6 +36,18 @@ import 
org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static 
org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
+
 public class AsterixHyracksIntegrationUtil {
     static class LoggerHolder {
         static final Logger LOGGER = 
Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName());
@@ -100,6 +102,12 @@ public class AsterixHyracksIntegrationUtil {
         for (Thread thread : startupThreads) {
             thread.join();
         }
+        // Wait until cluster becomes active
+        synchronized (ClusterStateManager.INSTANCE) {
+            while (ClusterStateManager.INSTANCE.getState() != 
ClusterState.ACTIVE) {
+                ClusterStateManager.INSTANCE.wait();
+            }
+        }
         hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, 
cc.getConfig().clientNetPort);
         ncs = nodeControllers.toArray(new 
NodeControllerService[nodeControllers.size()]);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b114e8c..b4b7a95 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -35,7 +35,6 @@ import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
@@ -206,7 +205,7 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext {
         activeManager = new ActiveManager(threadExecutor, 
ncApplicationContext.getNodeId(),
                 feedProperties.getMemoryComponentGlobalBudget(), 
compilerProperties.getFrameSize());
 
-        if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
+        if 
(replicationProperties.isParticipant(ncApplicationContext.getNodeId())) {
             String nodeId = ncApplicationContext.getNodeId();
 
             replicaResourcesManager = new 
ReplicaResourcesManager(localResourceRepository, metadataProperties);
@@ -225,10 +224,8 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext {
              * add the partitions that will be replicated in this node as 
inactive partitions
              */
             //get nodes which replicate to this node
-            Set<String> replicationClients = 
replicationProperties.getNodeReplicationClients(nodeId);
-            //remove the node itself
-            replicationClients.remove(nodeId);
-            for (String clientId : replicationClients) {
+            Set<String> remotePrimaryReplicas = 
replicationProperties.getRemotePrimaryReplicasIds(nodeId);
+            for (String clientId : remotePrimaryReplicas) {
                 //get the partitions of each client
                 ClusterPartition[] clientPartitions = 
metadataProperties.getNodePartitions().get(clientId);
                 for (ClusterPartition partition : clientPartitions) {
@@ -475,5 +472,4 @@ public class NCAppRuntimeContext implements 
IAppRuntimeContext {
     public IStorageComponentProvider getStorageComponentProvider() {
         return componentProvider;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4edf991..2efb139 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,7 +43,8 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -97,9 +98,11 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         this.appCtx = appCtx;
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
-        replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
-        localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem
-                
.getAsterixAppRuntimeContextProvider().getLocalResourceRepository();
+        ReplicationProperties repProperties = ((IPropertiesProvider) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getAppContext()).getReplicationProperties();
+        replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
+        localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getLocalResourceRepository();
         cachedEntityCommitsPerJobSize = 
txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
         checkpointManager = txnSubsystem.getCheckpointManager();
     }
@@ -127,15 +130,15 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         }
 
         if (replicationEnabled) {
-            if (checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN
-                    || (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN() && checkpointObject.isSharp())) {
-                //no logs exist or only remote logs exist
+            if (checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
+                //no logs exist
+                state = SystemState.HEALTHY;
+            } else if (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN() && checkpointObject.isSharp()) {
+                //only remote logs exist
                 state = SystemState.HEALTHY;
-                return state;
             } else {
                 //need to perform remote recovery
                 state = SystemState.CORRUPTED;
-                return state;
             }
         } else {
             long readableSmallestLSN = logMgr.getReadableSmallestLSN();
@@ -145,16 +148,14 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
                     //No choice but continuing when the log files are lost.
                 }
                 state = SystemState.HEALTHY;
-                return state;
             } else if (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN()
                     && checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
                 state = SystemState.HEALTHY;
-                return state;
             } else {
                 state = SystemState.CORRUPTED;
-                return state;
             }
         }
+        return state;
     }
 
     //This method is used only when replication is disabled.
@@ -179,6 +180,25 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     }
 
     @Override
+    public void startLocalRecovery(Set<Integer> partitions) throws 
IOException, ACIDException {
+        state = SystemState.RECOVERING;
+        LOGGER.log(Level.INFO, "starting recovery ...");
+
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        Checkpoint checkpointObject = checkpointManager.getLatest();
+        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
+        }
+
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
+
+        //get active partitions on this node
+        replayPartitionsLogs(partitions, logMgr.getLogReader(true), 
lowWaterMarkLSN);
+    }
+
+    @Override
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, 
ILogReader logReader, long lowWaterMarkLSN)
             throws IOException, ACIDException {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 808a252..8e9463b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -21,11 +21,11 @@ package org.apache.asterix.app.nc;
 import java.util.concurrent.Callable;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -70,7 +70,11 @@ public class TransactionSubsystem implements 
ITransactionSubsystem {
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
         this.lockManager = new 
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
-        final boolean replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
+        ReplicationProperties repProperties = ((IPropertiesProvider) 
asterixAppRuntimeContextProvider
+                .getAppContext()).getReplicationProperties();
+        IReplicationStrategy replicationStrategy = 
repProperties.getReplicationStrategy();
+        final boolean replicationEnabled = repProperties.isParticipant(id);
+
         final CheckpointProperties checkpointProperties = new 
CheckpointProperties(txnProperties, id);
         checkpointManager = CheckpointManagerFactory.create(this, 
checkpointProperties, replicationEnabled);
         final Checkpoint latestCheckpoint = checkpointManager.getLatest();
@@ -80,14 +84,8 @@ public class TransactionSubsystem implements 
ITransactionSubsystem {
                             latestCheckpoint.getStorageVersion(), 
StorageConstants.VERSION));
         }
 
-        ReplicationProperties asterixReplicationProperties = null;
-        if (asterixAppRuntimeContextProvider != null) {
-            asterixReplicationProperties = ((IPropertiesProvider) 
asterixAppRuntimeContextProvider.getAppContext())
-                    .getReplicationProperties();
-        }
-
-        if (asterixReplicationProperties != null && replicationEnabled) {
-            this.logManager = new LogManagerWithReplication(this);
+        if (replicationEnabled) {
+            this.logManager = new LogManagerWithReplication(this, 
replicationStrategy);
         } else {
             this.logManager = new LogManager(this);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
new file mode 100644
index 0000000..d1edec0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class BindMetadataNodeTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final boolean exportStub;
+
+    public BindMetadataNodeTask(boolean exportStub) {
+        this.exportStub = exportStub;
+    }
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        try {
+            if (exportStub) {
+                runtimeContext.exportMetadataNodeStub();
+            } else {
+                runtimeContext.unexportMetadataNodeStub();
+            }
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
new file mode 100644
index 0000000..8ef6ae3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CheckpointTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        ICheckpointManager checkpointMgr = 
runtimeContext.getTransactionSubsystem().getCheckpointManager();
+        checkpointMgr.doSharpCheckpoint();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
new file mode 100644
index 0000000..8e842a9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ExternalLibrarySetupTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final boolean metadataNode;
+
+    public ExternalLibrarySetupTask(boolean metadataNode) {
+        this.metadataNode = metadataNode;
+    }
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        try {
+            
ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), 
metadataNode);
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
new file mode 100644
index 0000000..203e453
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class LocalRecoveryTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public LocalRecoveryTask(Set<Integer> partitions) {
+        this.partitions = partitions;
+    }
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        try {
+            
runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
+        } catch (IOException | ACIDException e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
new file mode 100644
index 0000000..b6d85d9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class MetadataBootstrapTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext appContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        try {
+            SystemState state = 
appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
+            appContext.initializeMetadata(state == SystemState.NEW_UNIVERSE);
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
new file mode 100644
index 0000000..4574304
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class RemoteRecoveryTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final Map<String, Set<Integer>> recoveryPlan;
+
+    public RemoteRecoveryTask(Map<String, Set<Integer>> recoveryPlan) {
+        this.recoveryPlan = recoveryPlan;
+    }
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        
runtimeContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan);
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
IOException {
+        out.defaultWriteObject();
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
new file mode 100644
index 0000000..7c30a29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        ReportMaxResourceIdMessage.send((NodeControllerService) cs);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
new file mode 100644
index 0000000..6ae4487
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartFailbackTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        runtimeContext.getRemoteRecoveryManager().startFailbackProcess();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
new file mode 100644
index 0000000..3beb573
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.application.NCApplicationContext;
+
+public class StartLifecycleComponentsTask implements INCLifecycleTask {
+
+    private static final Logger LOGGER = 
Logger.getLogger(StartLifecycleComponentsTask.class.getName());
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        NCApplicationContext appContext = ncs.getApplicationContext();
+        MetadataProperties metadataProperties = ((IPropertiesProvider) 
runtimeContext).getMetadataProperties();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting lifecycle components");
+        }
+        Map<String, String> lifecycleMgmtConfiguration = new HashMap<>();
+        String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+        String dumpPath = 
metadataProperties.getCoredumpPath(appContext.getNodeId());
+        lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Coredump directory for NC is: " + dumpPath);
+        }
+        ILifeCycleComponentManager lccm = 
appContext.getLifeCycleComponentManager();
+        lccm.configure(lifecycleMgmtConfiguration);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Configured:" + lccm);
+        }
+        appContext.setStateDumpHandler(new 
AsterixStateDumpHandler(appContext.getNodeId(), lccm.getDumpPath(), lccm));
+        lccm.startAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
new file mode 100644
index 0000000..d060f61
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartReplicationServiceTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAppRuntimeContext runtimeContext = (IAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        try {
+            //Open replication channel
+            runtimeContext.getReplicationChannel().start();
+            final IReplicationManager replicationManager = 
runtimeContext.getReplicationManager();
+            //Check the state of remote replicas
+            replicationManager.initializeReplicasState();
+            //Start replication after the state of remote replicas has been 
initialized.
+            replicationManager.startReplicationThreads();
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
new file mode 100644
index 0000000..084563f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartFailbackTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.NodeFailbackPlan.FailbackPlanState;
+import 
org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
+import 
org.apache.asterix.app.replication.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import 
org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
+import 
org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import 
org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
+import 
org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
+import 
org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
+import 
org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.util.FaultToleranceUtil;
+import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+    private static final Logger LOGGER = 
Logger.getLogger(AutoFaultToleranceStrategy.class.getName());
+    private long clusterRequestId = 0;
+
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new 
LinkedList<>();
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new 
HashMap<>();
+    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = new HashMap<>();;
+    private String currentMetadataNode;
+    private boolean metadataNodeActive = false;
+    private IClusterStateManager clusterManager;
+    private ICCMessageBroker messageBroker;
+    private IReplicationStrategy replicationStrategy;
+    private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+
+    @Override
+    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+        pendingStartupCompletionNodes.add(nodeId);
+    }
+
+    @Override
+    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            notifyFailbackPlansNodeFailure(nodeId);
+            revertFailedFailbackPlanEffects();
+            return;
+        }
+        //an active node failed
+        failedNodes.add(nodeId);
+        clusterManager.updateNodePartitions(nodeId, false);
+        if (nodeId.equals(currentMetadataNode)) {
+            metadataNodeActive = false;
+            clusterManager.updateMetadataNode(nodeId, metadataNodeActive);
+        }
+        validateClusterState();
+        FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_FAILURE, clusterManager, messageBroker,
+                replicationStrategy);
+        notifyFailbackPlansNodeFailure(nodeId);
+        requestPartitionsTakeover(nodeId);
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back 
to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
+        if (!lostPartitions.isEmpty()) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
+                        break;
+                    }
+                }
+            }
+
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the 
partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            //For each replica, send a request to takeover the assigned 
partitions
+            for (Entry<String, List<Integer>> entry : 
partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new 
Integer[entry.getValue().size()]);
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we 
tried to send the request to
+                     * has failed. When the failure notification arrives, we 
will send any pending request
+                     * that belongs to the failed NC to a different active 
replica.
+                     */
+                    LOGGER.log(Level.WARNING, "Failed to send takeover 
request: " + takeoverRequest, e);
+                }
+            }
+        }
+    }
+
+    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        Map<String, Map<String, String>> activeNcConfiguration = 
clusterManager.getActiveNcConfiguration();
+        if (activeNcConfiguration.containsKey(replica) && 
!failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        ReplicationProperties replicationProperties = 
AppContextInfo.INSTANCE.getReplicationProperties();
+        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicasIds(failingBackNodeId);
+        clusterManager.getClusterPartitons();
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = 
clusterManager.getNodePartitions(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed 
back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
+    }
+
+    private synchronized void processPendingFailbackPlans() {
+        ClusterState state = clusterManager.getState();
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
+                        //partition expected to be returned to the failing 
back node
+                        clusterManager.updateClusterPartition(partitionId, 
failbackNode, false);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback 
completes
+                     */
+                    String originalMetadataNode = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                        clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
+                    }
+
+                    //force new jobs to wait
+                    clusterManager.setState(ClusterState.REBALANCING);
+                    handleFailbackRequests(plan, messageBroker);
+                    /**
+                     * wait until the current plan is completed before 
processing the next plan.
+                     * when the current one completes or is reverted, the 
cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
+                     */
+                    break;
+                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing 
to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: 
" + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
+    public synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        ClusterPartition[] clusterPartitons = 
clusterManager.getClusterPartitons();
+        Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>();
+        for (ClusterPartition partition : clusterPartitons) {
+            clusterPartitionsMap.put(partition.getPartitionId(), partition);
+        }
+        for (ClusterPartition partition : clusterPartitons) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request this node was supposed to 
handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitionsMap.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getRequestId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+        return nodePartitions;
+    }
+
+    public synchronized void process(TakeoverPartitionsResponseMessage 
response) throws HyracksDataException {
+        for (Integer partitonId : response.getPartitions()) {
+            clusterManager.updateClusterPartition(partitonId, 
response.getNodeId(), true);
+        }
+        pendingTakeoverRequests.remove(response.getRequestId());
+        validateClusterState();
+    }
+
+    public synchronized void process(TakeoverMetadataNodeResponseMessage 
response) throws HyracksDataException {
+        currentMetadataNode = response.getNodeId();
+        metadataNodeActive = true;
+        clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
+        validateClusterState();
+    }
+
+    private void validateClusterState() throws HyracksDataException {
+        clusterManager.refreshState();
+        ClusterState newState = clusterManager.getState();
+        // PENDING: all partitions are active but metadata node is not
+        if (newState == ClusterState.PENDING) {
+            requestMetadataNodeTakeover();
+        } else if (newState == ClusterState.ACTIVE) {
+            processPendingFailbackPlans();
+        }
+    }
+
+    public synchronized void process(PreparePartitionsFailbackResponseMessage 
msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back 
node
+            try {
+                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    public synchronized void process(CompleteFailbackResponseMessage response) 
throws HyracksDataException {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_JOIN, clusterManager, messageBroker,
+                replicationStrategy);
+        clusterManager.updateNodePartitions(nodeId, true);
+        validateClusterState();
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = 
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition();
+        //request the metadataPartition node to register itself as the 
metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new 
TakeoverMetadataNodeRequestMessage();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
+            // Since the metadata node will be changed, we need to rebind the 
proxy object
+            MetadataManager.INSTANCE.rebindMetadataNode();
+        } catch (Exception e) {
+
+            /**
+             * if we fail to send the request, it means the NC we tried to 
send the request to
+             * has failed. When the failure notification arrives, a new NC 
will be assigned to
+             * the metadata partition and a new metadata node takeover request 
will be sent to it.
+             */
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
+        }
+    }
+
+    @Override
+    public IFaultToleranceStrategy from(IReplicationStrategy 
replicationStrategy, ICCMessageBroker messageBroker) {
+        AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy();
+        ft.messageBroker = messageBroker;
+        ft.replicationStrategy = replicationStrategy;
+        return ft;
+    }
+
+    @Override
+    public synchronized void process(INCLifecycleMessage message) throws 
HyracksDataException {
+        switch (message.getType()) {
+            case STARTUP_TASK_REQUEST:
+                process((StartupTaskRequestMessage) message);
+                break;
+            case STARTUP_TASK_RESULT:
+                process((NCLifecycleTaskReportMessage) message);
+                break;
+            case TAKEOVER_PARTITION_RESPONSE:
+                process((TakeoverPartitionsResponseMessage) message);
+                break;
+            case TAKEOVER_METADATA_NODE_RESPONSE:
+                process((TakeoverMetadataNodeResponseMessage) message);
+                break;
+            case PREPARE_FAILBACK_RESPONSE:
+                process((PreparePartitionsFailbackResponseMessage) message);
+                break;
+            case COMPLETE_FAILBACK_RESPONSE:
+                process((CompleteFailbackResponseMessage) message);
+                break;
+            default:
+                throw new HyracksDataException("Unsupported message type: " + 
message.getType().name());
+        }
+    }
+
+    private synchronized void process(NCLifecycleTaskReportMessage msg) throws 
HyracksDataException {
+        final String nodeId = msg.getNodeId();
+        pendingStartupCompletionNodes.remove(nodeId);
+        if (msg.isSuccess()) {
+            if (failedNodes.contains(nodeId)) {
+                prepareFailbackPlan(nodeId);
+                return;
+            }
+            // If this node failed and recovered, notify impacted replicas to 
reconnect to it
+            if (replicationStrategy.isParticipant(nodeId) && 
failedNodes.remove(nodeId)) {
+                FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_JOIN, clusterManager,
+                        messageBroker, replicationStrategy);
+            }
+            clusterManager.updateNodePartitions(nodeId, true);
+            if (msg.getNodeId().equals(currentMetadataNode)) {
+                clusterManager.updateMetadataNode(currentMetadataNode, true);
+            }
+            clusterManager.refreshState();
+        } else {
+            LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete 
startup. ", msg.getException());
+        }
+    }
+
+    @Override
+    public void bindTo(IClusterStateManager clusterManager) {
+        this.clusterManager = clusterManager;
+        currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
+    }
+
+    private synchronized void process(StartupTaskRequestMessage msg) throws 
HyracksDataException {
+        final String nodeId = msg.getNodeId();
+        final SystemState state = msg.getState();
+        List<INCLifecycleTask> tasks;
+        if (state == SystemState.INITIAL_RUN || state == SystemState.HEALTHY) {
+            tasks = buildStartupSequence(nodeId);
+        } else {
+            // failed node returned. Need to start failback process
+            tasks = buildFailbackStartupSequence();
+        }
+        StartupTaskResponseMessage response = new 
StartupTaskResponseMessage(nodeId, tasks);
+        try {
+            messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    private List<INCLifecycleTask> buildFailbackStartupSequence() {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        tasks.add(new StartFailbackTask());
+        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new StartLifecycleComponentsTask());
+        return tasks;
+    }
+
+    private List<INCLifecycleTask> buildStartupSequence(String nodeId) {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        tasks.add(new StartReplicationServiceTask());
+        final boolean isMetadataNode = nodeId.equals(currentMetadataNode);
+        if (isMetadataNode) {
+            tasks.add(new MetadataBootstrapTask());
+        }
+        tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new CheckpointTask());
+        tasks.add(new StartLifecycleComponentsTask());
+        if (isMetadataNode) {
+            tasks.add(new BindMetadataNodeTask(true));
+        }
+        return tasks;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
new file mode 100644
index 0000000..8d382a1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class FaultToleranceStrategyFactory {
+
+    private static final Map<String, Class<? extends IFaultToleranceStrategy>>
+    BUILT_IN_FAULT_TOLERANCE_STRATEGY = new HashMap<>();
+
+    static {
+        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", 
NoFaultToleranceStrategy.class);
+        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node", 
MetadataNodeFaultToleranceStrategy.class);
+        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", 
AutoFaultToleranceStrategy.class);
+    }
+
+    private FaultToleranceStrategyFactory() {
+        throw new AssertionError();
+    }
+
+    public static IFaultToleranceStrategy create(Cluster cluster, 
IReplicationStrategy repStrategy,
+            ICCMessageBroker messageBroker) {
+        boolean highAvailabilityEnabled = cluster.getHighAvailability() != null
+                && cluster.getHighAvailability().getEnabled() != null
+                && Boolean.valueOf(cluster.getHighAvailability().getEnabled());
+
+        if (!highAvailabilityEnabled || 
cluster.getHighAvailability().getFaultTolerance() == null
+                || 
cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) {
+            return new NoFaultToleranceStrategy().from(repStrategy, 
messageBroker);
+        }
+        String strategyName = 
cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase();
+        if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) {
+            throw new IllegalArgumentException(String.format("Unsupported 
Replication Strategy. Available types: %s",
+                    BUILT_IN_FAULT_TOLERANCE_STRATEGY.keySet().toString()));
+        }
+        Class<? extends IFaultToleranceStrategy> clazz = 
BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName);
+        try {
+            return clazz.newInstance().from(repStrategy, messageBroker);
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
\ No newline at end of file

Reply via email to