[ASTERIXDB-2195][REPL] Replace Static Replication

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Redesigned all replication interfaces

Details:
- Replace static replication and fault tolerance by
  dynamic storage API.
- Remove static based fault tolerance strategies.
- Redesign replication APIs and classes to smaller
  maintainable parts.
- Clean up replication properties.
- Unify logic for checkpoints when replication is
  enabled.
- Remove static replication test cases.
- Add replication runtime test cases for:
  - Bulkload component replication.
  - Memory component recovery.
  - Flushed component replication.
- Add replication integration test for:
  - Resync failed replica.

Change-Id: Ic5c4b0ac199a4530c807e558c8aebb1eb1284048
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2252
Sonar-Qube: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Reviewed-by: Michael Blow <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0a5b641a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0a5b641a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0a5b641a

Branch: refs/heads/master
Commit: 0a5b641a99f8a88d26128c334824d9a18cfbdbbe
Parents: 08dc859
Author: Murtadha Hubail <[email protected]>
Authored: Fri Jan 5 13:42:01 2018 +0300
Committer: Murtadha Hubail <[email protected]>
Committed: Fri Jan 5 08:09:50 2018 -0800

----------------------------------------------------------------------
 .../asterix/app/nc/NCAppRuntimeContext.java     |   70 +-
 .../apache/asterix/app/nc/RecoveryManager.java  |   91 +-
 .../apache/asterix/app/nc/ReplicaManager.java   |   12 +-
 .../asterix/app/nc/TransactionSubsystem.java    |    2 +-
 .../asterix/app/nc/task/RemoteRecoveryTask.java |   57 -
 .../asterix/app/nc/task/StartFailbackTask.java  |   40 -
 .../nc/task/StartReplicationServiceTask.java    |    8 +-
 .../replication/AutoFaultToleranceStrategy.java |  542 -------
 .../FaultToleranceStrategyFactory.java          |   18 +-
 .../MetadataNodeFaultToleranceStrategy.java     |  304 ----
 .../replication/NoFaultToleranceStrategy.java   |   58 +-
 .../app/replication/NodeFailbackPlan.java       |  210 ---
 .../message/CompleteFailbackRequestMessage.java |   94 --
 .../CompleteFailbackResponseMessage.java        |   61 -
 ...PreparePartitionsFailbackRequestMessage.java |  122 --
 ...reparePartitionsFailbackResponseMessage.java |   58 -
 .../ReplayPartitionLogsRequestMessage.java      |   63 -
 .../ReplayPartitionLogsResponseMessage.java     |   57 -
 .../TakeoverPartitionsRequestMessage.java       |  108 --
 .../TakeoverPartitionsResponseMessage.java      |   66 -
 .../hyracks/bootstrap/CCApplication.java        |    5 +-
 .../apache/asterix/util/FaultToleranceUtil.java |   71 -
 .../asterix-app/src/main/resources/cc-rep.conf  |    3 +-
 .../replication/bulkload/bulkload.1.sto.cmd     |   19 +
 .../replication/bulkload/bulkload.10.post.http  |   19 +
 .../bulkload/bulkload.11.pollget.http           |   21 +
 .../bulkload/bulkload.12.query.sqlpp            |   22 +
 .../replication/bulkload/bulkload.13.sto.cmd    |   19 +
 .../replication/bulkload/bulkload.14.sto.cmd    |   19 +
 .../replication/bulkload/bulkload.2.sto.cmd     |   19 +
 .../bulkload/bulkload.3.pollget.http            |   21 +
 .../bulkload/bulkload.4.pollget.http            |   21 +
 .../replication/bulkload/bulkload.5.ddl.sqlpp   |   46 +
 .../bulkload/bulkload.6.update.sqlpp            |   23 +
 .../replication/bulkload/bulkload.7.sto.cmd     |   19 +
 .../replication/bulkload/bulkload.8.sto.cmd     |   19 +
 .../replication/bulkload/bulkload.9.post.http   |   19 +
 .../flushed_component.1.sto.cmd                 |   19 +
 .../flushed_component.10.sto.cmd                |   19 +
 .../flushed_component.2.pollget.http            |   21 +
 .../flushed_component.3.ddl.sqlpp               |   23 +
 .../flushed_component.4.get.http                |   19 +
 .../flushed_component.5.sto.cmd                 |   19 +
 .../flushed_component.6.post.http               |   19 +
 .../flushed_component.7.post.http               |   19 +
 .../flushed_component.8.pollget.http            |   21 +
 .../flushed_component.9.query.sqlpp             |   21 +
 .../mem_component_recovery.1.sto.cmd            |   19 +
 .../mem_component_recovery.10.post.http         |   19 +
 .../mem_component_recovery.11.pollget.http      |   21 +
 .../mem_component_recovery.12.query.sqlpp       |   22 +
 .../mem_component_recovery.13.sto.cmd           |   19 +
 .../mem_component_recovery.14.sto.cmd           |   19 +
 .../mem_component_recovery.2.sto.cmd            |   19 +
 .../mem_component_recovery.3.pollget.http       |   21 +
 .../mem_component_recovery.4.pollget.http       |   21 +
 .../mem_component_recovery.5.ddl.sqlpp          |   46 +
 .../mem_component_recovery.6.update.sqlpp       |   25 +
 .../mem_component_recovery.7.sto.cmd            |   19 +
 .../mem_component_recovery.8.sto.cmd            |   19 +
 .../mem_component_recovery.9.post.http          |   19 +
 .../metadata_failover.12.sto.cmd                |   19 +
 .../test/resources/runtimets/replication.xml    |   15 +
 .../cluster_state_1/cluster_state_1.1.regexadm  |    8 +-
 .../cluster_state_1_full.1.regexadm             |    8 +-
 .../cluster_state_1_less.1.regexadm             |    8 +-
 .../replication/bulkload/bulkload.10.adm        |    1 +
 .../results/replication/bulkload/bulkload.3.adm |    7 +
 .../results/replication/bulkload/bulkload.4.adm |    7 +
 .../results/replication/bulkload/bulkload.7.adm |    0
 .../results/replication/bulkload/bulkload.8.adm |    0
 .../results/replication/bulkload/bulkload.9.adm |   38 +
 .../flushed_component/flushed_component.2.adm   |    7 +
 .../flushed_component/flushed_component.4.adm   |    1 +
 .../flushed_component/flushed_component.6.adm   |    0
 .../flushed_component/flushed_component.7.adm   |    0
 .../flushed_component/flushed_component.8.adm   |   38 +
 .../flushed_component/flushed_component.9.adm   |    1 +
 .../mem_component_recovery.10.adm               |    1 +
 .../mem_component_recovery.3.adm                |    7 +
 .../mem_component_recovery.4.adm                |    7 +
 .../mem_component_recovery.7.adm                |    0
 .../mem_component_recovery.8.adm                |    0
 .../mem_component_recovery.9.adm                |   38 +
 .../metadata_failover/metadata_failover.11.adm  |    4 +-
 .../common/api/INcApplicationContext.java       |    6 -
 .../common/config/ReplicationProperties.java    |   80 +-
 .../AllDatasetsReplicationStrategy.java         |   27 +
 .../ChainedDeclusteringReplicationStrategy.java |  105 --
 .../replication/IFaultToleranceStrategy.java    |    8 +-
 .../common/replication/INCLifecycleMessage.java |    8 -
 .../common/replication/IPartitionReplica.java   |    7 +
 .../replication/IRemoteRecoveryManager.java     |   71 -
 .../replication/IReplicaResourcesManager.java   |   36 -
 .../replication/IReplicationDestination.java    |   60 +
 .../common/replication/IReplicationManager.java |  106 +-
 .../replication/IReplicationStrategy.java       |   34 -
 .../common/replication/IReplicationThread.java  |   46 -
 .../MetadataOnlyReplicationStrategy.java        |   79 --
 .../replication/NoReplicationStrategy.java      |   32 -
 .../asterix/common/replication/Replica.java     |  112 --
 .../common/replication/ReplicaEvent.java        |   67 -
 .../replication/ReplicationStrategyFactory.java |   22 +-
 .../common/storage/ResourceReference.java       |    4 +
 .../asterix/common/transactions/ILogRecord.java |   19 +-
 .../common/transactions/ILogRequester.java      |   29 +
 .../common/transactions/IRecoveryManager.java   |   11 +
 .../asterix/common/transactions/LogRecord.java  |   41 +-
 asterixdb/asterix-replication/pom.xml           |   12 -
 .../asterix/replication/api/IReplicaTask.java   |    3 +-
 .../replication/api/IReplicationMessage.java    |    2 +-
 .../replication/api/IReplicationWorker.java     |   39 +
 .../replication/api/PartitionReplica.java       |  168 +++
 .../replication/api/ReplicationDestination.java |  128 ++
 .../functions/ReplicaFilesRequest.java          |   70 -
 .../functions/ReplicaIndexFlushRequest.java     |   60 -
 .../functions/ReplicaLogsRequest.java           |   71 -
 .../functions/ReplicationProtocol.java          |  354 -----
 .../replication/logging/RemoteLogMapping.java   |   62 -
 .../replication/logging/RemoteLogRecord.java    |   44 +
 .../replication/logging/RemoteLogsNotifier.java |  102 ++
 .../logging/RemoteLogsProcessor.java            |   88 ++
 .../logging/ReplicationLogBuffer.java           |   13 +-
 .../asterix/replication/logging/TxnAck.java     |   49 +
 .../replication/logging/TxnAckTracker.java      |   63 +
 .../asterix/replication/logging/TxnLogUtil.java |   35 -
 .../management/IndexReplicationManager.java     |  184 +++
 .../management/LogReplicationManager.java       |  258 ++++
 .../management/ReplicaStateChecker.java         |   76 -
 .../management/ReplicationChannel.java          |  622 +-------
 .../management/ReplicationManager.java          | 1336 +-----------------
 .../CheckpointPartitionIndexesTask.java         |    5 +-
 .../messaging/ComponentMaskTask.java            |   92 ++
 .../replication/messaging/DeleteFileTask.java   |    5 +-
 .../replication/messaging/DropIndexTask.java    |   84 ++
 .../messaging/MarkComponentValidTask.java       |  110 ++
 .../PartitionResourcesListResponse.java         |    1 -
 .../messaging/PartitionResourcesListTask.java   |   13 +-
 .../messaging/ReplicateFileTask.java            |   29 +-
 .../messaging/ReplicateLogsTask.java            |   92 ++
 .../messaging/ReplicationProtocol.java          |  200 +++
 .../replication/recovery/FileSynchronizer.java  |   73 -
 .../recovery/RemoteRecoveryManager.java         |  320 -----
 .../recovery/ReplicaFilesSynchronizer.java      |   84 --
 .../recovery/ReplicaSynchronizer.java           |   66 -
 .../storage/LSMComponentLSNSyncTask.java        |   39 -
 .../storage/LSMComponentProperties.java         |  159 ---
 .../storage/LSMIndexFileProperties.java         |  109 --
 .../replication/storage/PartitionReplica.java   |  160 ---
 .../storage/ReplicaResourcesManager.java        |  226 ---
 .../replication/sync/FileSynchronizer.java      |   77 +
 .../replication/sync/IndexSynchronizer.java     |  140 ++
 .../sync/ReplicaFilesSynchronizer.java          |   86 ++
 .../replication/sync/ReplicaSynchronizer.java   |   65 +
 .../src/test/resources/data/fbu.adm             |   10 -
 .../runtime/message/ReplicaEventMessage.java    |   65 -
 .../server/test/MetadataReplicationIT.java      |  168 ---
 .../asterix/server/test/ReplicationIT.java      |   65 +-
 .../resources/MetadataReplicationIT/cc.conf     |   52 -
 .../MetadataReplicationIT/ncservice1.conf       |   20 -
 .../MetadataReplicationIT/ncservice2.conf       |   21 -
 .../src/test/resources/ReplicationIT/cc.conf    |    1 +
 .../metadata_node_recovery.1.ddl.aql            |   50 -
 .../metadata_node_recovery.10.node.aql          |   30 -
 .../metadata_node_recovery.11.sleep.aql         |   19 -
 .../metadata_node_recovery.12.node.aql          |   19 -
 .../metadata_node_recovery.13.sleep.aql         |   19 -
 .../metadata_node_recovery.14.ddl.aql           |   33 -
 .../metadata_node_recovery.15.node.aql          |   30 -
 .../metadata_node_recovery.16.sleep.aql         |   19 -
 .../metadata_node_recovery.17.node.aql          |   19 -
 .../metadata_node_recovery.18.sleep.aql         |   19 -
 .../metadata_node_recovery.19.query.aql         |   35 -
 .../metadata_node_recovery.2.node.aql           |   30 -
 .../metadata_node_recovery.3.sleep.aql          |   19 -
 .../metadata_node_recovery.4.get.http           |   30 -
 .../metadata_node_recovery.5.node.aql           |   19 -
 .../metadata_node_recovery.6.sleep.aql          |   19 -
 .../metadata_node_recovery.7.get.http           |   30 -
 .../metadata_node_recovery.8.query.aql          |   34 -
 .../metadata_node_recovery.9.ddl.aql            |   33 -
 .../metadata_node_recovery.cluster_state.4.adm  |   38 -
 .../metadata_node_recovery.cluster_state.7.adm  |   38 -
 .../metadata_node_recovery.query.19.adm         |    1 -
 .../metadata_node_recovery.query.8.adm          |    1 -
 .../metadata_only_replication/testsuite.xml     |   27 -
 .../node_failback/node_failback.1.ddl.aql       |   59 -
 .../node_failback/node_failback.10.get.http     |   30 -
 .../node_failback/node_failback.11.query.aql    |   33 -
 .../node_failback/node_failback.2.update.aql    |   36 -
 .../node_failback/node_failback.3.node.aql      |   28 -
 .../node_failback/node_failback.4.sleep.aql     |   19 -
 .../node_failback/node_failback.5.get.http      |   30 -
 .../node_failback/node_failback.6.query.aql     |   33 -
 .../node_failback/node_failback.7.update.aql    |   51 -
 .../node_failback/node_failback.8.node.aql      |   28 -
 .../node_failback/node_failback.9.sleep.aql     |   19 -
 .../failover/bulkload/bulkload.1.ddl.aql        |   54 -
 .../failover/bulkload/bulkload.2.update.aql     |   33 -
 .../failover/bulkload/bulkload.3.node.aql       |   28 -
 .../failover/bulkload/bulkload.4.sleep.aql      |   19 -
 .../failover/bulkload/bulkload.5.query.aql      |   31 -
 .../mem_component_recovery.1.ddl.aql            |   58 -
 .../mem_component_recovery.2.update.aql         |   34 -
 .../mem_component_recovery.3.node.aql           |   28 -
 .../mem_component_recovery.4.sleep.aql          |   19 -
 .../mem_component_recovery.5.query.aql          |   32 -
 .../metadata_node/metadata_node.1.ddl.aql       |   55 -
 .../metadata_node/metadata_node.2.node.aql      |   28 -
 .../metadata_node/metadata_node.3.sleep.aql     |   19 -
 .../metadata_node/metadata_node.4.query.aql     |   36 -
 .../resync_failed_replica.1.sto.cmd             |   19 +
 .../resync_failed_replica.10.pollget.http       |   21 +
 .../resync_failed_replica.11.post.http          |   19 +
 .../resync_failed_replica.12.post.http          |   19 +
 .../resync_failed_replica.13.pollget.http       |   21 +
 .../resync_failed_replica.14.query.sqlpp        |   21 +
 .../resync_failed_replica.15.ddl.sqlpp          |   19 +
 .../resync_failed_replica.16.query.sqlpp        |   21 +
 .../resync_failed_replica.17.sto.cmd            |   19 +
 .../resync_failed_replica.2.pollget.http        |   21 +
 .../resync_failed_replica.3.ddl.sqlpp           |   23 +
 .../resync_failed_replica.4.node.cmd            |   19 +
 .../resync_failed_replica.5.pollget.http        |   21 +
 .../resync_failed_replica.6.pollget.http        |   21 +
 .../resync_failed_replica.7.node.cmd            |   19 +
 .../resync_failed_replica.8.pollget.http        |   21 +
 .../resync_failed_replica.9.sto.cmd             |   19 +
 .../node_failback.cluster_state.10.adm          |   38 -
 .../node_failback.cluster_state.5.adm           |   38 -
 .../node_failback/node_failback.query.11.adm    |    1 -
 .../node_failback/node_failback.query.6.adm     |    1 -
 .../results/failover/bulkload/bulkload.5.adm    |    1 -
 .../mem_component_recovery.5.adm                |    1 -
 .../failover/metadata_node/metadata_node.4.adm  |    1 -
 .../resync_failed_replica.10.adm                |    7 +
 .../resync_failed_replica.11.adm                |    0
 .../resync_failed_replica.12.adm                |    0
 .../resync_failed_replica.13.adm                |   38 +
 .../resync_failed_replica.14.adm                |    1 +
 .../resync_failed_replica.16.adm                |    3 +
 .../resync_failed_replica.2.adm                 |    7 +
 .../resync_failed_replica.5.adm                 |   38 +
 .../resync_failed_replica.6.adm                 |    7 +
 .../resync_failed_replica.8.adm                 |   38 +
 .../integrationts/replication/testsuite.xml     |   21 +-
 .../PersistentLocalResourceRepository.java      |   72 +-
 ...ersistentLocalResourceRepositoryFactory.java |   15 +-
 .../management/service/logging/LogBuffer.java   |    9 +-
 .../logging/LogManagerWithReplication.java      |   10 +-
 .../recovery/CheckpointManagerFactory.java      |    8 +-
 .../recovery/ReplicationCheckpointManager.java  |  144 --
 .../api/replication/IReplicationJob.java        |    4 +
 253 files changed, 4156 insertions(+), 8776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 425cbe4..9c53c18 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -54,11 +54,8 @@ import 
org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
-import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -72,10 +69,8 @@ import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.asterix.replication.management.ReplicationManager;
-import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.runtime.utils.NoOpCoordinationService;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -139,8 +134,6 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private ActiveManager activeManager;
     private IReplicationChannel replicationChannel;
     private IReplicationManager replicationManager;
-    private IRemoteRecoveryManager remoteRecoveryManager;
-    private IReplicaResourcesManager replicaResourcesManager;
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
@@ -190,8 +183,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         indexCheckpointManagerProvider = new 
IndexCheckpointManagerProvider(ioManager);
 
         ILocalResourceRepositoryFactory 
persistentLocalResourceRepositoryFactory =
-                new PersistentLocalResourceRepositoryFactory(ioManager, 
getServiceContext().getNodeId(),
-                        metadataProperties, indexCheckpointManagerProvider);
+                new PersistentLocalResourceRepositoryFactory(ioManager, 
indexCheckpointManagerProvider);
 
         localResourceRepository =
                 (PersistentLocalResourceRepository) 
persistentLocalResourceRepositoryFactory.createRepository();
@@ -224,47 +216,21 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                 this.ncServiceContext);
 
         if (replicationProperties.isReplicationEnabled()) {
+            replicationManager = new ReplicationManager(this, 
replicationProperties);
 
-            replicaResourcesManager = new 
ReplicaResourcesManager(localResourceRepository, metadataProperties,
-                    indexCheckpointManagerProvider);
-
-            replicationManager = new ReplicationManager(nodeId, 
replicationProperties, replicaResourcesManager,
-                    txnSubsystem.getLogManager(), 
asterixAppRuntimeContextProvider, ncServiceContext);
-
-            if 
(replicationManager.getReplicationStrategy().isParticipant(getServiceContext().getNodeId()))
 {
-
-                //pass replication manager to replication required object
-                //LogManager to replicate logs
-                
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
-
-                //PersistentLocalResourceRepository to replicate metadata 
files and delete backups on drop index
-                
localResourceRepository.setReplicationManager(replicationManager);
-
-                /*
-                 * add the partitions that will be replicated in this node as 
inactive partitions
-                 */
-                //get nodes which replicate to this node
-                Set<String> remotePrimaryReplicas = 
replicationManager.getReplicationStrategy()
-                        
.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
-                for (String clientId : remotePrimaryReplicas) {
-                    //get the partitions of each client
-                    ClusterPartition[] clientPartitions = 
metadataProperties.getNodePartitions().get(clientId);
-                    for (ClusterPartition partition : clientPartitions) {
-                        
localResourceRepository.addInactivePartition(partition.getPartitionId());
-                    }
-                }
+            //pass replication manager to replication required object
+            //LogManager to replicate logs
+            
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
 
-                //initialize replication channel
-                replicationChannel = new ReplicationChannel(nodeId, 
replicationProperties, txnSubsystem.getLogManager(),
-                        replicaResourcesManager, replicationManager, 
getServiceContext(),
-                        asterixAppRuntimeContextProvider, 
replicationManager.getReplicationStrategy());
+            //PersistentLocalResourceRepository to replicate metadata files 
and delete backups on drop index
+            localResourceRepository.setReplicationManager(replicationManager);
 
-                remoteRecoveryManager = new 
RemoteRecoveryManager(replicationManager, this, replicationProperties);
+            //initialize replication channel
+            replicationChannel = new ReplicationChannel(this);
 
-                bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
-                        storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory(),
-                        replicationManager);
-            }
+            bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
+                    storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory(),
+                    replicationManager);
         } else {
             bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
                     storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory());
@@ -441,16 +407,6 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     }
 
     @Override
-    public IReplicaResourcesManager getReplicaResourcesManager() {
-        return replicaResourcesManager;
-    }
-
-    @Override
-    public IRemoteRecoveryManager getRemoteRecoveryManager() {
-        return remoteRecoveryManager;
-    }
-
-    @Override
     public IReplicationManager getReplicationManager() {
         return replicationManager;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f458688..080ad48 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -46,7 +47,6 @@ import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.Checkpoint;
@@ -96,16 +96,17 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     private final ICheckpointManager checkpointManager;
     private SystemState state;
     private final INCServiceContext serviceCtx;
+    private final INcApplicationContext appCtx;
+
 
     public RecoveryManager(ITransactionSubsystem txnSubsystem, 
INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
         this.txnSubsystem = txnSubsystem;
+        this.appCtx = 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
         logMgr = (LogManager) txnSubsystem.getLogManager();
-        ReplicationProperties repProperties = 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
-                .getReplicationProperties();
+        ReplicationProperties repProperties = 
appCtx.getReplicationProperties();
         replicationEnabled = repProperties.isReplicationEnabled();
-        localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getLocalResourceRepository();
+        localResourceRepository = (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         cachedEntityCommitsPerJobSize = 
txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
         checkpointManager = txnSubsystem.getCheckpointManager();
     }
@@ -129,32 +130,18 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
             LOGGER.info("The checkpoint file doesn't exist: systemState = 
PERMANENT_DATA_LOSS");
             return state;
         }
-
-        if (replicationEnabled) {
-            if (checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
-                //no logs exist
-                state = SystemState.HEALTHY;
-            } else if (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN() && checkpointObject.isSharp()) {
-                //only remote logs exist
-                state = SystemState.HEALTHY;
-            } else {
-                //need to perform remote recovery
-                state = SystemState.CORRUPTED;
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (logMgr.getAppendLSN() == readableSmallestLSN) {
+            if (checkpointObject.getMinMCTFirstLsn() != 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
+                LOGGER.warn("Some(or all) of transaction log files are lost.");
+                //No choice but continuing when the log files are lost.
             }
+            state = SystemState.HEALTHY;
+        } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
+                && checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
+            state = SystemState.HEALTHY;
         } else {
-            long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-            if (logMgr.getAppendLSN() == readableSmallestLSN) {
-                if (checkpointObject.getMinMCTFirstLsn() != 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
-                    LOGGER.warn("Some(or all) of transaction log files are 
lost.");
-                    //No choice but continuing when the log files are lost.
-                }
-                state = SystemState.HEALTHY;
-            } else if (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN()
-                    && checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
-                state = SystemState.HEALTHY;
-            } else {
-                state = SystemState.CORRUPTED;
-            }
+            state = SystemState.CORRUPTED;
         }
         return state;
     }
@@ -442,9 +429,47 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     }
 
     private long getRemoteMinFirstLSN() throws HyracksDataException {
-        IReplicaResourcesManager remoteResourcesManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getAppContext().getReplicaResourcesManager();
-        return 
remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
+        // find the min first lsn of partitions that are replicated on this 
node
+        final Set<Integer> allPartitions = 
localResourceRepository.getAllPartitions();
+        final INcApplicationContext appContext = 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
+        final Set<Integer> masterPartitions = 
appContext.getReplicaManager().getPartitions();
+        allPartitions.removeAll(masterPartitions);
+        return getPartitionsMinLSN(allPartitions);
+    }
+
+    private long getPartitionsMinLSN(Set<Integer> partitions) throws 
HyracksDataException {
+        final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = 
appCtx.getIndexCheckpointManagerProvider();
+        long minRemoteLSN = Long.MAX_VALUE;
+        for (Integer partition : partitions) {
+            final List<DatasetResourceReference> partitionResources = 
localResourceRepository.getResources(resource -> {
+                DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
+                return dsResource.getPartition() == partition;
+            
}).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            for (DatasetResourceReference indexRef : partitionResources) {
+                long remoteIndexMaxLSN = 
idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
+                minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+            }
+        }
+        return minRemoteLSN;
+    }
+
+    @Override
+    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean 
flush) throws HyracksDataException {
+        long minLSN = getPartitionsMinLSN(partitions);
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (minLSN < readableSmallestLSN) {
+            minLSN = readableSmallestLSN;
+        }
+
+        //replay logs > minLSN that belong to these partitions
+        try {
+            replayPartitionsLogs(partitions, logMgr.getLogReader(true), 
minLSN);
+            if (flush) {
+                appCtx.getDatasetLifecycleManager().flushAllDatasets();
+            }
+        } catch (IOException | ACIDException e) {
+            throw HyracksDataException.create(e);
+        }
     }
 
     @Override
@@ -535,7 +560,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         TxnEntityId loserEntity;
         List<Long> undoLSNSet = null;
         //get active partitions on this node
-        Set<Integer> activePartitions = 
localResourceRepository.getActivePartitions();
+        Set<Integer> activePartitions = 
appCtx.getReplicaManager().getPartitions();
         ILogReader logReader = logMgr.getLogReader(false);
         try {
             logReader.setPosition(firstLSN);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index bf17a5b..155fa1d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -29,10 +29,10 @@ import java.util.stream.Stream;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IPartitionReplica;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
-import org.apache.asterix.replication.storage.PartitionReplica;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ReplicaManager implements IReplicaManager {
@@ -67,7 +67,9 @@ public class ReplicaManager implements IReplicaManager {
         if (!replicas.containsKey(id)) {
             throw new IllegalStateException("replica with id(" + id + ") does 
not exist");
         }
-        replicas.remove(id);
+        PartitionReplica replica = replicas.remove(id);
+        appCtx.getReplicationManager().unregister(replica);
+
     }
 
     @Override
@@ -83,8 +85,8 @@ public class ReplicaManager implements IReplicaManager {
 
     @Override
     public synchronized void promote(int partition) throws 
HyracksDataException {
-        final IRemoteRecoveryManager remoteRecoveryManager = 
appCtx.getRemoteRecoveryManager();
-        
remoteRecoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()),
 true);
+        final IRecoveryManager recoveryManager = 
appCtx.getTransactionSubsystem().getRecoveryManager();
+        
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()),
 true);
         partitions.add(partition);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index cd9b617..fae0413 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -76,7 +76,7 @@ public class TransactionSubsystem implements 
ITransactionSubsystem {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.log(Level.INFO, "Checkpoint Properties: " + 
checkpointProperties);
         }
-        checkpointManager = CheckpointManagerFactory.create(this, 
checkpointProperties, replicationEnabled);
+        checkpointManager = CheckpointManagerFactory.create(this, 
checkpointProperties);
         final Checkpoint latestCheckpoint = checkpointManager.getLatest();
         if (latestCheckpoint != null) {
             transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
deleted file mode 100644
index d330684..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class RemoteRecoveryTask implements INCLifecycleTask {
-
-    private static final long serialVersionUID = 1L;
-    private final Map<String, Set<Integer>> recoveryPlan;
-
-    public RemoteRecoveryTask(Map<String, Set<Integer>> recoveryPlan) {
-        this.recoveryPlan = recoveryPlan;
-    }
-
-    @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
-        INcApplicationContext appContext = (INcApplicationContext) 
cs.getApplicationContext();
-        
appContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan);
-    }
-
-    private void writeObject(java.io.ObjectOutputStream out) throws 
IOException {
-        out.defaultWriteObject();
-    }
-
-    private void readObject(java.io.ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        in.defaultReadObject();
-    }
-
-    @Override
-    public String toString() {
-        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", 
\"recovery-plan\" : " + recoveryPlan + " }";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
deleted file mode 100644
index ecd93a3..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class StartFailbackTask implements INCLifecycleTask {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void perform(IControllerService cs) throws HyracksDataException {
-        INcApplicationContext appContext = (INcApplicationContext) 
cs.getApplicationContext();
-        appContext.getRemoteRecoveryManager().startFailbackProcess();
-    }
-
-    @Override
-    public String toString() {
-        return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
index 253f121..7071271 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -20,7 +20,6 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
@@ -32,13 +31,8 @@ public class StartReplicationServiceTask implements 
INCLifecycleTask {
     public void perform(IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) 
cs.getApplicationContext();
         try {
-            //Open replication channel
+            // open replication channel
             appContext.getReplicationChannel().start();
-            final IReplicationManager replicationManager = 
appContext.getReplicationManager();
-            //Check the state of remote replicas
-            replicationManager.initializeReplicasState();
-            //Start replication after the state of remote replicas has been 
initialized.
-            replicationManager.startReplicationThreads();
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
deleted file mode 100644
index 37b4d4f..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
-import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
-import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
-import org.apache.asterix.app.nc.task.StartFailbackTask;
-import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
-import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
-import org.apache.asterix.app.replication.NodeFailbackPlan.FailbackPlanState;
-import 
org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
-import 
org.apache.asterix.app.replication.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
-import 
org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
-import 
org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
-import 
org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
-import 
org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
-import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
-import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
-import 
org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
-import 
org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.IFaultToleranceStrategy;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.util.FaultToleranceUtil;
-import org.apache.hyracks.api.application.ICCServiceContext;
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    private long clusterRequestId = 0;
-
-    private Set<String> failedNodes = new HashSet<>();
-    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new 
LinkedList<>();
-    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new 
HashMap<>();
-    private Map<Long, TakeoverPartitionsRequestMessage> 
pendingTakeoverRequests = new HashMap<>();;
-    private String currentMetadataNode;
-    private boolean metadataNodeActive = false;
-    private IClusterStateManager clusterManager;
-    private ICCMessageBroker messageBroker;
-    private IReplicationStrategy replicationStrategy;
-    private ICCServiceContext serviceCtx;
-    private Set<String> pendingStartupCompletionNodes = new HashSet<>();
-    private List<String> nodeIds;
-    private Map<String, SystemState> startupQueue = new HashMap<>();
-
-    @Override
-    public void notifyNodeJoin(String nodeId) {
-        pendingStartupCompletionNodes.add(nodeId);
-    }
-
-    @Override
-    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
-        //if this node was waiting for failback and failed before it completed
-        if (failedNodes.contains(nodeId)) {
-            notifyFailbackPlansNodeFailure(nodeId);
-            revertFailedFailbackPlanEffects();
-            return;
-        }
-        //an active node failed
-        failedNodes.add(nodeId);
-        clusterManager.updateNodePartitions(nodeId, false);
-        if (nodeId.equals(currentMetadataNode)) {
-            metadataNodeActive = false;
-            clusterManager.updateMetadataNode(nodeId, metadataNodeActive);
-        }
-        validateClusterState();
-        FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_FAILURE, clusterManager, messageBroker,
-                replicationStrategy);
-        notifyFailbackPlansNodeFailure(nodeId);
-        requestPartitionsTakeover(nodeId);
-    }
-
-    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
-        for (NodeFailbackPlan plan : planId2FailbackPlanMap.values()) {
-            plan.notifyNodeFailure(nodeId);
-        }
-    }
-
-    private synchronized void revertFailedFailbackPlanEffects() {
-        Iterator<NodeFailbackPlan> iterator = 
planId2FailbackPlanMap.values().iterator();
-        while (iterator.hasNext()) {
-            NodeFailbackPlan plan = iterator.next();
-            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-                //TODO if the failing back node is still active, notify it to 
construct a new plan for it
-                iterator.remove();
-
-                //reassign the partitions that were supposed to be failed back 
to an active replica
-                requestPartitionsTakeover(plan.getNodeId());
-            }
-        }
-    }
-
-    private synchronized void requestPartitionsTakeover(String failedNodeId) {
-        //replica -> list of partitions to takeover
-        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        //collect the partitions of the failed NC
-        List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
-        if (!lostPartitions.isEmpty()) {
-            for (ClusterPartition partition : lostPartitions) {
-                //find replicas for this partitions
-                Set<String> partitionReplicas = 
replicationStrategy.getRemoteReplicas(partition.getNodeId()).stream()
-                        .map(Replica::getId).collect(Collectors.toSet());
-                //find a replica that is still active
-                for (String replica : partitionReplicas) {
-                    //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
-                    //It needs to be modified to consider load balancing.
-                    if (addActiveReplica(replica, partition, 
partitionRecoveryPlan)) {
-                        break;
-                    }
-                }
-            }
-
-            if (partitionRecoveryPlan.size() == 0) {
-                //no active replicas were found for the failed node
-                LOGGER.error("Could not find active replicas for the 
partitions " + lostPartitions);
-                return;
-            } else {
-                LOGGER.info("Partitions to recover: " + lostPartitions);
-            }
-            //For each replica, send a request to takeover the assigned 
partitions
-            partitionRecoveryPlan.forEach((replica, value) -> {
-                Integer[] partitionsToTakeover = value.toArray(new 
Integer[value.size()]);
-                long requestId = clusterRequestId++;
-                TakeoverPartitionsRequestMessage takeoverRequest = new 
TakeoverPartitionsRequestMessage(requestId,
-                        replica, partitionsToTakeover);
-                pendingTakeoverRequests.put(requestId, takeoverRequest);
-                try {
-                    messageBroker.sendApplicationMessageToNC(takeoverRequest, 
replica);
-                } catch (Exception e) {
-                    /*
-                     * if we fail to send the request, it means the NC we 
tried to send the request to
-                     * has failed. When the failure notification arrives, we 
will send any pending request
-                     * that belongs to the failed NC to a different active 
replica.
-                     */
-                    LOGGER.log(Level.WARN, "Failed to send takeover request: " 
+ takeoverRequest, e);
-                }
-            });
-        }
-    }
-
-    private boolean addActiveReplica(String replica, ClusterPartition 
partition,
-            Map<String, List<Integer>> partitionRecoveryPlan) {
-        final Set<String> participantNodes = 
clusterManager.getParticipantNodes();
-        if (participantNodes.contains(replica) && 
!failedNodes.contains(replica)) {
-            if (!partitionRecoveryPlan.containsKey(replica)) {
-                List<Integer> replicaPartitions = new ArrayList<>();
-                replicaPartitions.add(partition.getPartitionId());
-                partitionRecoveryPlan.put(replica, replicaPartitions);
-            } else {
-                
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
-        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
-        pendingProcessingFailbackPlans.add(plan);
-        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
-        //get all partitions this node requires to resync
-        Set<String> nodeReplicas = 
replicationStrategy.getRemoteReplicas(failingBackNodeId).stream().map(Replica::getId)
-                .collect(Collectors.toSet());
-        clusterManager.getClusterPartitons();
-        for (String replicaId : nodeReplicas) {
-            ClusterPartition[] nodePartitions = 
clusterManager.getNodePartitions(replicaId);
-            for (ClusterPartition partition : nodePartitions) {
-                plan.addParticipant(partition.getActiveNodeId());
-                /*
-                 * if the partition original node is the returning node,
-                 * add it to the list of the partitions which will be failed 
back
-                 */
-                if (partition.getNodeId().equals(failingBackNodeId)) {
-                    plan.addPartitionToFailback(partition.getPartitionId(), 
partition.getActiveNodeId());
-                }
-            }
-        }
-
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Prepared Failback plan: " + plan.toString());
-        }
-
-        processPendingFailbackPlans();
-    }
-
-    private synchronized void processPendingFailbackPlans() {
-        ClusterState state = clusterManager.getState();
-        /*
-         * if the cluster state is not ACTIVE, then failbacks should not be 
processed
-         * since some partitions are not active
-         */
-        if (state == ClusterState.ACTIVE) {
-            while (!pendingProcessingFailbackPlans.isEmpty()) {
-                //take the first pending failback plan
-                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
-                /*
-                 * A plan at this stage will be in one of two states:
-                 * 1. PREPARING -> the participants were selected but we 
haven't sent any request.
-                 * 2. PENDING_ROLLBACK -> a participant failed before we send 
any requests
-                 */
-                if (plan.getState() == FailbackPlanState.PREPARING) {
-                    //set the partitions that will be failed back as inactive
-                    String failbackNode = plan.getNodeId();
-                    for (Integer partitionId : plan.getPartitionsToFailback()) 
{
-                        //partition expected to be returned to the failing 
back node
-                        clusterManager.updateClusterPartition(partitionId, 
failbackNode, false);
-                    }
-
-                    /*
-                     * if the returning node is the original metadata node,
-                     * then metadata node will change after the failback 
completes
-                     */
-                    ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-                    String originalMetadataNode = 
appCtx.getMetadataProperties().getMetadataNodeName();
-                    if (originalMetadataNode.equals(failbackNode)) {
-                        
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
-                        currentMetadataNode = "";
-                        metadataNodeActive = false;
-                        clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
-                    }
-
-                    //force new jobs to wait
-                    clusterManager.setState(ClusterState.REBALANCING);
-                    handleFailbackRequests(plan, messageBroker);
-                    /*
-                     * wait until the current plan is completed before 
processing the next plan.
-                     * when the current one completes or is reverted, the 
cluster state will be
-                     * ACTIVE again, and the next failback plan (if any) will 
be processed.
-                     */
-                    break;
-                } else if (plan.getState() == 
FailbackPlanState.PENDING_ROLLBACK) {
-                    //this plan failed before sending any requests -> nothing 
to rollback
-                    planId2FailbackPlanMap.remove(plan.getPlanId());
-                }
-            }
-        }
-    }
-
-    private void handleFailbackRequests(NodeFailbackPlan plan, 
ICCMessageBroker messageBroker) {
-        //send requests to other nodes to complete on-going jobs and prepare 
partitions for failback
-        for (PreparePartitionsFailbackRequestMessage request : 
plan.getPlanFailbackRequests()) {
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeID());
-                plan.addPendingRequest(request);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARN, "Failed to send failback request to: " 
+ request.getNodeID(), e);
-                plan.notifyNodeFailure(request.getNodeID());
-                revertFailedFailbackPlanEffects();
-                break;
-            }
-        }
-    }
-
-    public synchronized List<ClusterPartition> 
getNodeAssignedPartitions(String nodeId) {
-        List<ClusterPartition> nodePartitions = new ArrayList<>();
-        ClusterPartition[] clusterPartitons = 
clusterManager.getClusterPartitons();
-        Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>();
-        for (ClusterPartition partition : clusterPartitons) {
-            clusterPartitionsMap.put(partition.getPartitionId(), partition);
-        }
-        for (ClusterPartition partition : clusterPartitons) {
-            if (nodeId.equals(partition.getActiveNodeId())) {
-                nodePartitions.add(partition);
-            }
-        }
-        /*
-         * if there is any pending takeover request this node was supposed to 
handle,
-         * it needs to be sent to a different replica
-         */
-        List<Long> failedTakeoverRequests = new ArrayList<>();
-        for (TakeoverPartitionsRequestMessage request : 
pendingTakeoverRequests.values()) {
-            if (request.getNodeId().equals(nodeId)) {
-                for (Integer partitionId : request.getPartitions()) {
-                    nodePartitions.add(clusterPartitionsMap.get(partitionId));
-                }
-                failedTakeoverRequests.add(request.getRequestId());
-            }
-        }
-
-        //remove failed requests
-        for (Long requestId : failedTakeoverRequests) {
-            pendingTakeoverRequests.remove(requestId);
-        }
-        return nodePartitions;
-    }
-
-    public synchronized void process(TakeoverPartitionsResponseMessage 
response) throws HyracksDataException {
-        for (Integer partitonId : response.getPartitions()) {
-            clusterManager.updateClusterPartition(partitonId, 
response.getNodeId(), true);
-        }
-        pendingTakeoverRequests.remove(response.getRequestId());
-        validateClusterState();
-    }
-
-    public synchronized void process(MetadataNodeResponseMessage response) 
throws HyracksDataException {
-        currentMetadataNode = response.getNodeId();
-        metadataNodeActive = true;
-        clusterManager.updateMetadataNode(currentMetadataNode, 
metadataNodeActive);
-        validateClusterState();
-    }
-
-    private void validateClusterState() throws HyracksDataException {
-        clusterManager.refreshState();
-        ClusterState newState = clusterManager.getState();
-        // PENDING: all partitions are active but metadata node is not
-        if (newState == ClusterState.PENDING) {
-            requestMetadataNodeTakeover();
-        } else if (newState == ClusterState.ACTIVE) {
-            processPendingFailbackPlans();
-        }
-    }
-
-    public synchronized void process(PreparePartitionsFailbackResponseMessage 
msg) {
-        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
-        plan.markRequestCompleted(msg.getRequestId());
-        /*
-         * A plan at this stage will be in one of three states:
-         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still 
expected (wait).
-         * 2. PENDING_COMPLETION -> all responses received (time to send 
completion request).
-         * 3. PENDING_ROLLBACK -> the plan failed and we just received the 
final pending response (revert).
-         */
-        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
-            CompleteFailbackRequestMessage request = 
plan.getCompleteFailbackRequestMessage();
-
-            //send complete resync and takeover partitions to the failing back 
node
-            try {
-                messageBroker.sendApplicationMessageToNC(request, 
request.getNodeId());
-            } catch (Exception e) {
-                LOGGER.log(Level.WARN, "Failed to send complete failback 
request to: " + request.getNodeId(), e);
-                notifyFailbackPlansNodeFailure(request.getNodeId());
-                revertFailedFailbackPlanEffects();
-            }
-        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
-            revertFailedFailbackPlanEffects();
-        }
-    }
-
-    public synchronized void process(CompleteFailbackResponseMessage response) 
throws HyracksDataException {
-        /*
-         * the failback plan completed successfully:
-         * Remove all references to it.
-         * Remove the the failing back node from the failed nodes list.
-         * Notify its replicas to reconnect to it.
-         * Set the failing back node partitions as active.
-         */
-        NodeFailbackPlan plan = 
planId2FailbackPlanMap.remove(response.getPlanId());
-        String nodeId = plan.getNodeId();
-        failedNodes.remove(nodeId);
-        //notify impacted replicas they can reconnect to this node
-        FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_JOIN, clusterManager, messageBroker,
-                replicationStrategy);
-        clusterManager.updateNodePartitions(nodeId, true);
-        validateClusterState();
-    }
-
-    private synchronized void requestMetadataNodeTakeover() {
-        //need a new node to takeover metadata node
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-        ClusterPartition metadataPartiton = 
appCtx.getMetadataProperties().getMetadataPartition();
-        //request the metadataPartition node to register itself as the 
metadata node
-        MetadataNodeRequestMessage takeoverRequest = new 
MetadataNodeRequestMessage(true);
-        try {
-            messageBroker.sendApplicationMessageToNC(takeoverRequest, 
metadataPartiton.getActiveNodeId());
-            // Since the metadata node will be changed, we need to rebind the 
proxy object
-            MetadataManager.INSTANCE.rebindMetadataNode();
-        } catch (Exception e) {
-            /*
-             * if we fail to send the request, it means the NC we tried to 
send the request to
-             * has failed. When the failure notification arrives, a new NC 
will be assigned to
-             * the metadata partition and a new metadata node takeover request 
will be sent to it.
-             */
-            LOGGER.log(Level.WARN,
-                    "Failed to send metadata node takeover request to: " + 
metadataPartiton.getActiveNodeId(), e);
-        }
-    }
-
-    @Override
-    public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, 
IReplicationStrategy replicationStrategy) {
-        AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy();
-        ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
-        ft.replicationStrategy = replicationStrategy;
-        ft.serviceCtx = serviceCtx;
-        ft.nodeIds = serviceCtx.getAppConfig().getNCNames();
-        return ft;
-    }
-
-    @Override
-    public synchronized void process(INCLifecycleMessage message) throws 
HyracksDataException {
-        switch (message.getType()) {
-            case REGISTRATION_TASKS_REQUEST:
-                process((RegistrationTasksRequestMessage) message);
-                break;
-            case REGISTRATION_TASKS_RESULT:
-                process((NCLifecycleTaskReportMessage) message);
-                break;
-            case TAKEOVER_PARTITION_RESPONSE:
-                process((TakeoverPartitionsResponseMessage) message);
-                break;
-            case METADATA_NODE_RESPONSE:
-                process((MetadataNodeResponseMessage) message);
-                break;
-            case PREPARE_FAILBACK_RESPONSE:
-                process((PreparePartitionsFailbackResponseMessage) message);
-                break;
-            case COMPLETE_FAILBACK_RESPONSE:
-                process((CompleteFailbackResponseMessage) message);
-                break;
-            default:
-                throw new 
RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, 
message.getType().name());
-        }
-    }
-
-    private synchronized void process(NCLifecycleTaskReportMessage msg) throws 
HyracksDataException {
-        final String nodeId = msg.getNodeId();
-        pendingStartupCompletionNodes.remove(nodeId);
-        if (msg.isSuccess()) {
-            if (failedNodes.contains(nodeId)) {
-                prepareFailbackPlan(nodeId);
-                return;
-            }
-            // If this node failed and recovered, notify impacted replicas to 
reconnect to it
-            if (replicationStrategy.isParticipant(nodeId) && 
failedNodes.remove(nodeId)) {
-                FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_JOIN, clusterManager,
-                        messageBroker, replicationStrategy);
-            }
-            clusterManager.updateNodePartitions(nodeId, true);
-            if (msg.getNodeId().equals(currentMetadataNode)) {
-                clusterManager.updateMetadataNode(currentMetadataNode, true);
-            }
-            clusterManager.refreshState();
-        } else {
-            LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete 
startup. ", msg.getException());
-        }
-    }
-
-    @Override
-    public void bindTo(IClusterStateManager clusterManager) {
-        this.clusterManager = clusterManager;
-        currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
-    }
-
-    private synchronized void process(RegistrationTasksRequestMessage msg) 
throws HyracksDataException {
-        final String nodeId = msg.getNodeId();
-        final SystemState state = msg.getState();
-        //last node needed to start
-        if (startupQueue.keySet().size() == nodeIds.size() - 1) {
-            startupQueue.put(nodeId, state);
-            for (Map.Entry<String, SystemState> nodeState : 
startupQueue.entrySet()) {
-                List<INCLifecycleTask> tasks = 
buildStartupSequence(nodeState.getKey());
-                RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeState.getKey(),
-                        tasks);
-                try {
-                    messageBroker.sendApplicationMessageToNC(response, 
nodeState.getKey());
-                } catch (Exception e) {
-                    throw HyracksDataException.create(e);
-                }
-            }
-        } else if (!failedNodes.isEmpty()) {
-            List<INCLifecycleTask> tasks = buildFailbackStartupSequence();
-            RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeId, tasks);
-            try {
-                messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
-            }
-        } else {
-            startupQueue.put(nodeId, state);
-        }
-    }
-
-    private List<INCLifecycleTask> buildFailbackStartupSequence() {
-        final List<INCLifecycleTask> tasks = new ArrayList<>();
-        tasks.add(new StartFailbackTask());
-        tasks.add(new ReportLocalCountersTask());
-        tasks.add(new StartLifecycleComponentsTask());
-        return tasks;
-    }
-
-    private List<INCLifecycleTask> buildStartupSequence(String nodeId) {
-        final List<INCLifecycleTask> tasks = new ArrayList<>();
-        tasks.add(new StartReplicationServiceTask());
-        final boolean isMetadataNode = nodeId.equals(currentMetadataNode);
-        if (isMetadataNode) {
-            tasks.add(new MetadataBootstrapTask());
-        }
-        tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
-        tasks.add(new ReportLocalCountersTask());
-        tasks.add(new CheckpointTask());
-        tasks.add(new StartLifecycleComponentsTask());
-        if (isMetadataNode) {
-            tasks.add(new BindMetadataNodeTask(true));
-        }
-        return tasks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
index fe24fca..5a715d6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.app.replication;
 
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
-import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.hyracks.api.application.ICCServiceContext;
 
 public class FaultToleranceStrategyFactory {
@@ -29,20 +27,10 @@ public class FaultToleranceStrategyFactory {
         throw new AssertionError();
     }
 
-    public static final String STRATEGY_NAME = "metadata_only";
-
-    public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, 
ReplicationProperties repProp,
-            IReplicationStrategy strategy) {
-        Class<? extends IFaultToleranceStrategy> clazz;
-        if (!repProp.isReplicationEnabled()) {
-            clazz = NoFaultToleranceStrategy.class;
-        } else if (STRATEGY_NAME.equals(repProp.getReplicationStrategy())) {
-            clazz = MetadataNodeFaultToleranceStrategy.class;
-        } else {
-            clazz = AutoFaultToleranceStrategy.class;
-        }
+    public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, 
boolean replicationEnabled) {
+        Class<? extends IFaultToleranceStrategy> clazz = 
NoFaultToleranceStrategy.class;
         try {
-            return clazz.newInstance().from(serviceCtx, strategy);
+            return clazz.newInstance().from(serviceCtx, replicationEnabled);
         } catch (InstantiationException | IllegalAccessException e) {
             throw new IllegalStateException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
deleted file mode 100644
index 02174da..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
-import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
-import org.apache.asterix.app.nc.task.LocalRecoveryTask;
-import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.RemoteRecoveryTask;
-import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
-import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
-import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
-import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
-import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
-import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
-import 
org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
-import 
org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
-import 
org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage;
-import 
org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.IFaultToleranceStrategy;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
-import org.apache.asterix.util.FaultToleranceUtil;
-import org.apache.hyracks.api.application.ICCServiceContext;
-import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class MetadataNodeFaultToleranceStrategy implements 
IFaultToleranceStrategy {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    private IClusterStateManager clusterManager;
-    private String metadataNodeId;
-    private IReplicationStrategy replicationStrategy;
-    private ICCMessageBroker messageBroker;
-    private ICCServiceContext serviceCtx;
-    private final Set<String> hotStandbyMetadataReplica = new HashSet<>();
-    private final Set<String> failedNodes = new HashSet<>();
-    private Set<String> pendingStartupCompletionNodes = new HashSet<>();
-
-    @Override
-    public synchronized void notifyNodeJoin(String nodeId) throws 
HyracksDataException {
-        pendingStartupCompletionNodes.add(nodeId);
-    }
-
-    @Override
-    public synchronized void notifyNodeFailure(String nodeId) throws 
HyracksDataException {
-        failedNodes.add(nodeId);
-        hotStandbyMetadataReplica.remove(nodeId);
-        clusterManager.updateNodePartitions(nodeId, false);
-        if (nodeId.equals(metadataNodeId)) {
-            clusterManager.updateMetadataNode(metadataNodeId, false);
-        }
-        clusterManager.refreshState();
-        if (replicationStrategy.isParticipant(nodeId)) {
-            // Notify impacted replica
-            FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_FAILURE, clusterManager,
-                    messageBroker, replicationStrategy);
-        }
-        // If the failed node is the metadata node, ask its replicas to replay 
any committed jobs
-        if (nodeId.equals(metadataNodeId)) {
-            ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-            int metadataPartitionId = 
appCtx.getMetadataProperties().getMetadataPartition().getPartitionId();
-            Set<Integer> metadataPartition = new 
HashSet<>(Arrays.asList(metadataPartitionId));
-            Set<Replica> activeRemoteReplicas = 
replicationStrategy.getRemoteReplicas(metadataNodeId).stream()
-                    .filter(replica -> 
!failedNodes.contains(replica.getId())).collect(Collectors.toSet());
-            //TODO Do election to identity the node with latest state
-            for (Replica replica : activeRemoteReplicas) {
-                ReplayPartitionLogsRequestMessage msg = new 
ReplayPartitionLogsRequestMessage(metadataPartition);
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, 
replica.getId());
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARN, "Failed sending an application 
message to an NC", e);
-                    continue;
-                }
-            }
-        }
-    }
-
-    @Override
-    public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, 
IReplicationStrategy replicationStrategy) {
-        MetadataNodeFaultToleranceStrategy ft = new 
MetadataNodeFaultToleranceStrategy();
-        ft.replicationStrategy = replicationStrategy;
-        ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
-        ft.serviceCtx = serviceCtx;
-        return ft;
-    }
-
-    @Override
-    public synchronized void process(INCLifecycleMessage message) throws 
HyracksDataException {
-        switch (message.getType()) {
-            case REGISTRATION_TASKS_REQUEST:
-                process((RegistrationTasksRequestMessage) message);
-                break;
-            case REGISTRATION_TASKS_RESULT:
-                process((NCLifecycleTaskReportMessage) message);
-                break;
-            case REPLAY_LOGS_RESPONSE:
-                process((ReplayPartitionLogsResponseMessage) message);
-                break;
-            case METADATA_NODE_RESPONSE:
-                process((MetadataNodeResponseMessage) message);
-                break;
-            default:
-                throw new 
RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, 
message.getType().name());
-        }
-    }
-
-    @Override
-    public synchronized void bindTo(IClusterStateManager clusterManager) {
-        this.clusterManager = clusterManager;
-        this.metadataNodeId = clusterManager.getCurrentMetadataNodeId();
-    }
-
-    @Override
-    public void notifyMetadataNodeChange(String node) throws 
HyracksDataException {
-        if (metadataNodeId.equals(node)) {
-            return;
-        }
-        // if current metadata node is active, we need to unbind its metadata 
proxy object
-        if (clusterManager.isMetadataNodeActive()) {
-            MetadataNodeRequestMessage msg = new 
MetadataNodeRequestMessage(false);
-            try {
-                messageBroker.sendApplicationMessageToNC(msg, metadataNodeId);
-                // when the current node responses, we will bind to the new one
-                metadataNodeId = node;
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
-            }
-        } else {
-            requestMetadataNodeTakeover(node);
-        }
-    }
-
-    private synchronized void process(ReplayPartitionLogsResponseMessage msg) {
-        hotStandbyMetadataReplica.add(msg.getNodeId());
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Hot Standby Metadata Replicas: " + 
hotStandbyMetadataReplica);
-        }
-    }
-
-    private synchronized void process(RegistrationTasksRequestMessage msg) 
throws HyracksDataException {
-        final String nodeId = msg.getNodeId();
-        final SystemState state = msg.getState();
-        final boolean isParticipant = 
replicationStrategy.isParticipant(nodeId);
-        List<INCLifecycleTask> tasks;
-        if (!isParticipant) {
-            tasks = buildNonParticipantStartupSequence(nodeId, state);
-        } else {
-            tasks = buildParticipantStartupSequence(nodeId, state);
-        }
-        RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeId, tasks);
-        try {
-            messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    private synchronized void process(NCLifecycleTaskReportMessage msg) throws 
HyracksDataException {
-        final String nodeId = msg.getNodeId();
-        pendingStartupCompletionNodes.remove(nodeId);
-        if (msg.isSuccess()) {
-            // If this node failed and recovered, notify impacted replicas to 
reconnect to it
-            if (replicationStrategy.isParticipant(nodeId) && 
failedNodes.remove(nodeId)) {
-                FaultToleranceUtil.notifyImpactedReplicas(nodeId, 
ClusterEventType.NODE_JOIN, clusterManager,
-                        messageBroker, replicationStrategy);
-            }
-            clusterManager.updateNodePartitions(msg.getNodeId(), true);
-            if (msg.getNodeId().equals(metadataNodeId)) {
-                clusterManager.updateMetadataNode(metadataNodeId, true);
-                // When metadata node is active, it is the only hot standby 
replica
-                hotStandbyMetadataReplica.clear();
-                hotStandbyMetadataReplica.add(metadataNodeId);
-            }
-            clusterManager.refreshState();
-        } else {
-            LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete 
startup. ", msg.getException());
-        }
-    }
-
-    private List<INCLifecycleTask> buildNonParticipantStartupSequence(String 
nodeId, SystemState state) {
-        final List<INCLifecycleTask> tasks = new ArrayList<>();
-        if (state == SystemState.CORRUPTED) {
-            //need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
-                    
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
-            tasks.add(rt);
-        }
-        tasks.add(new ExternalLibrarySetupTask(false));
-        tasks.add(new ReportLocalCountersTask());
-        tasks.add(new CheckpointTask());
-        tasks.add(new StartLifecycleComponentsTask());
-        return tasks;
-    }
-
-    private List<INCLifecycleTask> buildParticipantStartupSequence(String 
nodeId, SystemState state) {
-        final List<INCLifecycleTask> tasks = new ArrayList<>();
-        switch (state) {
-            case PERMANENT_DATA_LOSS:
-                if (failedNodes.isEmpty()) { //bootstrap
-                    break;
-                }
-                // If the metadata node (or replica) failed and lost its data
-                // => Metadata Remote Recovery from standby replica
-                tasks.add(getMetadataPartitionRecoveryPlan());
-                // Checkpoint after remote recovery to move node to HEALTHY 
state
-                tasks.add(new CheckpointTask());
-                break;
-            case CORRUPTED:
-                // If the metadata node (or replica) failed and started again 
without losing data => Local Recovery
-                LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
-                        
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
-                tasks.add(rt);
-                break;
-            case BOOTSTRAPPING:
-            case HEALTHY:
-            case RECOVERING:
-                break;
-            default:
-                break;
-        }
-        tasks.add(new StartReplicationServiceTask());
-        final boolean isMetadataNode = nodeId.equals(metadataNodeId);
-        if (isMetadataNode) {
-            tasks.add(new MetadataBootstrapTask());
-        }
-        tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
-        tasks.add(new ReportLocalCountersTask());
-        tasks.add(new CheckpointTask());
-        tasks.add(new StartLifecycleComponentsTask());
-        if (isMetadataNode) {
-            tasks.add(new BindMetadataNodeTask(true));
-        }
-        return tasks;
-    }
-
-    private RemoteRecoveryTask getMetadataPartitionRecoveryPlan() {
-        if (hotStandbyMetadataReplica.isEmpty()) {
-            throw new IllegalStateException("No metadata replicas to recover 
from");
-        }
-        // Construct recovery plan: Node => Set of partitions to recover from 
it
-        Map<String, Set<Integer>> recoveryPlan = new HashMap<>();
-        // Recover metadata partition from any metadata hot standby replica
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-        int metadataPartitionId = 
appCtx.getMetadataProperties().getMetadataPartition().getPartitionId();
-        Set<Integer> metadataPartition = new 
HashSet<>(Arrays.asList(metadataPartitionId));
-        recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), 
metadataPartition);
-        return new RemoteRecoveryTask(recoveryPlan);
-    }
-
-    private void process(MetadataNodeResponseMessage response) throws 
HyracksDataException {
-        clusterManager.updateMetadataNode(response.getNodeId(), 
response.isExported());
-        if (!response.isExported()) {
-            requestMetadataNodeTakeover(metadataNodeId);
-        }
-    }
-
-    private void requestMetadataNodeTakeover(String node) throws 
HyracksDataException {
-        MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true);
-        try {
-            messageBroker.sendApplicationMessageToNC(msg, node);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-}
\ No newline at end of file

Reply via email to