[ASTERIXDB-2195][REPL] Replace Static Replication - user model changes: no - storage format changes: no - interface changes: yes - Redesigned all replication interfaces
Details: - Replace static replication and fault tolerance by dynamic storage API. - Remove static based fault tolerance strategies. - Redesign replication APIs and classes to smaller maintainable parts. - Clean up replication properties. - Unify logic for checkpoints when replication is enabled. - Remove static replication test cases. - Add replication runtime test cases for: - Bulkload component replication. - Memory component recovery. - Flushed component replication. - Add replication integration test for: - Resync failed replica. Change-Id: Ic5c4b0ac199a4530c807e558c8aebb1eb1284048 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2252 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0a5b641a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0a5b641a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0a5b641a Branch: refs/heads/master Commit: 0a5b641a99f8a88d26128c334824d9a18cfbdbbe Parents: 08dc859 Author: Murtadha Hubail <[email protected]> Authored: Fri Jan 5 13:42:01 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Fri Jan 5 08:09:50 2018 -0800 ---------------------------------------------------------------------- .../asterix/app/nc/NCAppRuntimeContext.java | 70 +- .../apache/asterix/app/nc/RecoveryManager.java | 91 +- .../apache/asterix/app/nc/ReplicaManager.java | 12 +- .../asterix/app/nc/TransactionSubsystem.java | 2 +- .../asterix/app/nc/task/RemoteRecoveryTask.java | 57 - .../asterix/app/nc/task/StartFailbackTask.java | 40 - .../nc/task/StartReplicationServiceTask.java | 8 +- .../replication/AutoFaultToleranceStrategy.java | 542 ------- .../FaultToleranceStrategyFactory.java | 18 +- .../MetadataNodeFaultToleranceStrategy.java | 304 ---- .../replication/NoFaultToleranceStrategy.java | 58 +- .../app/replication/NodeFailbackPlan.java | 210 --- .../message/CompleteFailbackRequestMessage.java | 94 -- .../CompleteFailbackResponseMessage.java | 61 - ...PreparePartitionsFailbackRequestMessage.java | 122 -- ...reparePartitionsFailbackResponseMessage.java | 58 - .../ReplayPartitionLogsRequestMessage.java | 63 - .../ReplayPartitionLogsResponseMessage.java | 57 - .../TakeoverPartitionsRequestMessage.java | 108 -- .../TakeoverPartitionsResponseMessage.java | 66 - .../hyracks/bootstrap/CCApplication.java | 5 +- .../apache/asterix/util/FaultToleranceUtil.java | 71 - .../asterix-app/src/main/resources/cc-rep.conf | 3 +- .../replication/bulkload/bulkload.1.sto.cmd | 19 + .../replication/bulkload/bulkload.10.post.http | 19 + .../bulkload/bulkload.11.pollget.http | 21 + .../bulkload/bulkload.12.query.sqlpp | 22 + .../replication/bulkload/bulkload.13.sto.cmd | 19 + .../replication/bulkload/bulkload.14.sto.cmd | 19 + .../replication/bulkload/bulkload.2.sto.cmd | 19 + .../bulkload/bulkload.3.pollget.http | 21 + .../bulkload/bulkload.4.pollget.http | 21 + .../replication/bulkload/bulkload.5.ddl.sqlpp | 46 + .../bulkload/bulkload.6.update.sqlpp | 23 + .../replication/bulkload/bulkload.7.sto.cmd | 19 + .../replication/bulkload/bulkload.8.sto.cmd | 19 + .../replication/bulkload/bulkload.9.post.http | 19 + .../flushed_component.1.sto.cmd | 19 + .../flushed_component.10.sto.cmd | 19 + .../flushed_component.2.pollget.http | 21 + .../flushed_component.3.ddl.sqlpp | 23 + .../flushed_component.4.get.http | 19 + .../flushed_component.5.sto.cmd | 19 + .../flushed_component.6.post.http | 19 + .../flushed_component.7.post.http | 19 + .../flushed_component.8.pollget.http | 21 + .../flushed_component.9.query.sqlpp | 21 + .../mem_component_recovery.1.sto.cmd | 19 + .../mem_component_recovery.10.post.http | 19 + .../mem_component_recovery.11.pollget.http | 21 + .../mem_component_recovery.12.query.sqlpp | 22 + .../mem_component_recovery.13.sto.cmd | 19 + .../mem_component_recovery.14.sto.cmd | 19 + .../mem_component_recovery.2.sto.cmd | 19 + .../mem_component_recovery.3.pollget.http | 21 + .../mem_component_recovery.4.pollget.http | 21 + .../mem_component_recovery.5.ddl.sqlpp | 46 + .../mem_component_recovery.6.update.sqlpp | 25 + .../mem_component_recovery.7.sto.cmd | 19 + .../mem_component_recovery.8.sto.cmd | 19 + .../mem_component_recovery.9.post.http | 19 + .../metadata_failover.12.sto.cmd | 19 + .../test/resources/runtimets/replication.xml | 15 + .../cluster_state_1/cluster_state_1.1.regexadm | 8 +- .../cluster_state_1_full.1.regexadm | 8 +- .../cluster_state_1_less.1.regexadm | 8 +- .../replication/bulkload/bulkload.10.adm | 1 + .../results/replication/bulkload/bulkload.3.adm | 7 + .../results/replication/bulkload/bulkload.4.adm | 7 + .../results/replication/bulkload/bulkload.7.adm | 0 .../results/replication/bulkload/bulkload.8.adm | 0 .../results/replication/bulkload/bulkload.9.adm | 38 + .../flushed_component/flushed_component.2.adm | 7 + .../flushed_component/flushed_component.4.adm | 1 + .../flushed_component/flushed_component.6.adm | 0 .../flushed_component/flushed_component.7.adm | 0 .../flushed_component/flushed_component.8.adm | 38 + .../flushed_component/flushed_component.9.adm | 1 + .../mem_component_recovery.10.adm | 1 + .../mem_component_recovery.3.adm | 7 + .../mem_component_recovery.4.adm | 7 + .../mem_component_recovery.7.adm | 0 .../mem_component_recovery.8.adm | 0 .../mem_component_recovery.9.adm | 38 + .../metadata_failover/metadata_failover.11.adm | 4 +- .../common/api/INcApplicationContext.java | 6 - .../common/config/ReplicationProperties.java | 80 +- .../AllDatasetsReplicationStrategy.java | 27 + .../ChainedDeclusteringReplicationStrategy.java | 105 -- .../replication/IFaultToleranceStrategy.java | 8 +- .../common/replication/INCLifecycleMessage.java | 8 - .../common/replication/IPartitionReplica.java | 7 + .../replication/IRemoteRecoveryManager.java | 71 - .../replication/IReplicaResourcesManager.java | 36 - .../replication/IReplicationDestination.java | 60 + .../common/replication/IReplicationManager.java | 106 +- .../replication/IReplicationStrategy.java | 34 - .../common/replication/IReplicationThread.java | 46 - .../MetadataOnlyReplicationStrategy.java | 79 -- .../replication/NoReplicationStrategy.java | 32 - .../asterix/common/replication/Replica.java | 112 -- .../common/replication/ReplicaEvent.java | 67 - .../replication/ReplicationStrategyFactory.java | 22 +- .../common/storage/ResourceReference.java | 4 + .../asterix/common/transactions/ILogRecord.java | 19 +- .../common/transactions/ILogRequester.java | 29 + .../common/transactions/IRecoveryManager.java | 11 + .../asterix/common/transactions/LogRecord.java | 41 +- asterixdb/asterix-replication/pom.xml | 12 - .../asterix/replication/api/IReplicaTask.java | 3 +- .../replication/api/IReplicationMessage.java | 2 +- .../replication/api/IReplicationWorker.java | 39 + .../replication/api/PartitionReplica.java | 168 +++ .../replication/api/ReplicationDestination.java | 128 ++ .../functions/ReplicaFilesRequest.java | 70 - .../functions/ReplicaIndexFlushRequest.java | 60 - .../functions/ReplicaLogsRequest.java | 71 - .../functions/ReplicationProtocol.java | 354 ----- .../replication/logging/RemoteLogMapping.java | 62 - .../replication/logging/RemoteLogRecord.java | 44 + .../replication/logging/RemoteLogsNotifier.java | 102 ++ .../logging/RemoteLogsProcessor.java | 88 ++ .../logging/ReplicationLogBuffer.java | 13 +- .../asterix/replication/logging/TxnAck.java | 49 + .../replication/logging/TxnAckTracker.java | 63 + .../asterix/replication/logging/TxnLogUtil.java | 35 - .../management/IndexReplicationManager.java | 184 +++ .../management/LogReplicationManager.java | 258 ++++ .../management/ReplicaStateChecker.java | 76 - .../management/ReplicationChannel.java | 622 +------- .../management/ReplicationManager.java | 1336 +----------------- .../CheckpointPartitionIndexesTask.java | 5 +- .../messaging/ComponentMaskTask.java | 92 ++ .../replication/messaging/DeleteFileTask.java | 5 +- .../replication/messaging/DropIndexTask.java | 84 ++ .../messaging/MarkComponentValidTask.java | 110 ++ .../PartitionResourcesListResponse.java | 1 - .../messaging/PartitionResourcesListTask.java | 13 +- .../messaging/ReplicateFileTask.java | 29 +- .../messaging/ReplicateLogsTask.java | 92 ++ .../messaging/ReplicationProtocol.java | 200 +++ .../replication/recovery/FileSynchronizer.java | 73 - .../recovery/RemoteRecoveryManager.java | 320 ----- .../recovery/ReplicaFilesSynchronizer.java | 84 -- .../recovery/ReplicaSynchronizer.java | 66 - .../storage/LSMComponentLSNSyncTask.java | 39 - .../storage/LSMComponentProperties.java | 159 --- .../storage/LSMIndexFileProperties.java | 109 -- .../replication/storage/PartitionReplica.java | 160 --- .../storage/ReplicaResourcesManager.java | 226 --- .../replication/sync/FileSynchronizer.java | 77 + .../replication/sync/IndexSynchronizer.java | 140 ++ .../sync/ReplicaFilesSynchronizer.java | 86 ++ .../replication/sync/ReplicaSynchronizer.java | 65 + .../src/test/resources/data/fbu.adm | 10 - .../runtime/message/ReplicaEventMessage.java | 65 - .../server/test/MetadataReplicationIT.java | 168 --- .../asterix/server/test/ReplicationIT.java | 65 +- .../resources/MetadataReplicationIT/cc.conf | 52 - .../MetadataReplicationIT/ncservice1.conf | 20 - .../MetadataReplicationIT/ncservice2.conf | 21 - .../src/test/resources/ReplicationIT/cc.conf | 1 + .../metadata_node_recovery.1.ddl.aql | 50 - .../metadata_node_recovery.10.node.aql | 30 - .../metadata_node_recovery.11.sleep.aql | 19 - .../metadata_node_recovery.12.node.aql | 19 - .../metadata_node_recovery.13.sleep.aql | 19 - .../metadata_node_recovery.14.ddl.aql | 33 - .../metadata_node_recovery.15.node.aql | 30 - .../metadata_node_recovery.16.sleep.aql | 19 - .../metadata_node_recovery.17.node.aql | 19 - .../metadata_node_recovery.18.sleep.aql | 19 - .../metadata_node_recovery.19.query.aql | 35 - .../metadata_node_recovery.2.node.aql | 30 - .../metadata_node_recovery.3.sleep.aql | 19 - .../metadata_node_recovery.4.get.http | 30 - .../metadata_node_recovery.5.node.aql | 19 - .../metadata_node_recovery.6.sleep.aql | 19 - .../metadata_node_recovery.7.get.http | 30 - .../metadata_node_recovery.8.query.aql | 34 - .../metadata_node_recovery.9.ddl.aql | 33 - .../metadata_node_recovery.cluster_state.4.adm | 38 - .../metadata_node_recovery.cluster_state.7.adm | 38 - .../metadata_node_recovery.query.19.adm | 1 - .../metadata_node_recovery.query.8.adm | 1 - .../metadata_only_replication/testsuite.xml | 27 - .../node_failback/node_failback.1.ddl.aql | 59 - .../node_failback/node_failback.10.get.http | 30 - .../node_failback/node_failback.11.query.aql | 33 - .../node_failback/node_failback.2.update.aql | 36 - .../node_failback/node_failback.3.node.aql | 28 - .../node_failback/node_failback.4.sleep.aql | 19 - .../node_failback/node_failback.5.get.http | 30 - .../node_failback/node_failback.6.query.aql | 33 - .../node_failback/node_failback.7.update.aql | 51 - .../node_failback/node_failback.8.node.aql | 28 - .../node_failback/node_failback.9.sleep.aql | 19 - .../failover/bulkload/bulkload.1.ddl.aql | 54 - .../failover/bulkload/bulkload.2.update.aql | 33 - .../failover/bulkload/bulkload.3.node.aql | 28 - .../failover/bulkload/bulkload.4.sleep.aql | 19 - .../failover/bulkload/bulkload.5.query.aql | 31 - .../mem_component_recovery.1.ddl.aql | 58 - .../mem_component_recovery.2.update.aql | 34 - .../mem_component_recovery.3.node.aql | 28 - .../mem_component_recovery.4.sleep.aql | 19 - .../mem_component_recovery.5.query.aql | 32 - .../metadata_node/metadata_node.1.ddl.aql | 55 - .../metadata_node/metadata_node.2.node.aql | 28 - .../metadata_node/metadata_node.3.sleep.aql | 19 - .../metadata_node/metadata_node.4.query.aql | 36 - .../resync_failed_replica.1.sto.cmd | 19 + .../resync_failed_replica.10.pollget.http | 21 + .../resync_failed_replica.11.post.http | 19 + .../resync_failed_replica.12.post.http | 19 + .../resync_failed_replica.13.pollget.http | 21 + .../resync_failed_replica.14.query.sqlpp | 21 + .../resync_failed_replica.15.ddl.sqlpp | 19 + .../resync_failed_replica.16.query.sqlpp | 21 + .../resync_failed_replica.17.sto.cmd | 19 + .../resync_failed_replica.2.pollget.http | 21 + .../resync_failed_replica.3.ddl.sqlpp | 23 + .../resync_failed_replica.4.node.cmd | 19 + .../resync_failed_replica.5.pollget.http | 21 + .../resync_failed_replica.6.pollget.http | 21 + .../resync_failed_replica.7.node.cmd | 19 + .../resync_failed_replica.8.pollget.http | 21 + .../resync_failed_replica.9.sto.cmd | 19 + .../node_failback.cluster_state.10.adm | 38 - .../node_failback.cluster_state.5.adm | 38 - .../node_failback/node_failback.query.11.adm | 1 - .../node_failback/node_failback.query.6.adm | 1 - .../results/failover/bulkload/bulkload.5.adm | 1 - .../mem_component_recovery.5.adm | 1 - .../failover/metadata_node/metadata_node.4.adm | 1 - .../resync_failed_replica.10.adm | 7 + .../resync_failed_replica.11.adm | 0 .../resync_failed_replica.12.adm | 0 .../resync_failed_replica.13.adm | 38 + .../resync_failed_replica.14.adm | 1 + .../resync_failed_replica.16.adm | 3 + .../resync_failed_replica.2.adm | 7 + .../resync_failed_replica.5.adm | 38 + .../resync_failed_replica.6.adm | 7 + .../resync_failed_replica.8.adm | 38 + .../integrationts/replication/testsuite.xml | 21 +- .../PersistentLocalResourceRepository.java | 72 +- ...ersistentLocalResourceRepositoryFactory.java | 15 +- .../management/service/logging/LogBuffer.java | 9 +- .../logging/LogManagerWithReplication.java | 10 +- .../recovery/CheckpointManagerFactory.java | 8 +- .../recovery/ReplicationCheckpointManager.java | 144 -- .../api/replication/IReplicationJob.java | 4 + 253 files changed, 4156 insertions(+), 8776 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 425cbe4..9c53c18 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -54,11 +54,8 @@ import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.common.replication.IRemoteRecoveryManager; -import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; -import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.IReplicaManager; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; @@ -72,10 +69,8 @@ import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.api.IAsterixStateProxy; import org.apache.asterix.metadata.api.IMetadataNode; import org.apache.asterix.metadata.bootstrap.MetadataBootstrap; -import org.apache.asterix.replication.management.ReplicationChannel; import org.apache.asterix.replication.management.ReplicationManager; -import org.apache.asterix.replication.recovery.RemoteRecoveryManager; -import org.apache.asterix.replication.storage.ReplicaResourcesManager; +import org.apache.asterix.replication.management.ReplicationChannel; import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider; import org.apache.asterix.runtime.utils.NoOpCoordinationService; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; @@ -139,8 +134,6 @@ public class NCAppRuntimeContext implements INcApplicationContext { private ActiveManager activeManager; private IReplicationChannel replicationChannel; private IReplicationManager replicationManager; - private IRemoteRecoveryManager remoteRecoveryManager; - private IReplicaResourcesManager replicaResourcesManager; private final ILibraryManager libraryManager; private final NCExtensionManager ncExtensionManager; private final IStorageComponentProvider componentProvider; @@ -190,8 +183,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager); ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = - new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(), - metadataProperties, indexCheckpointManagerProvider); + new PersistentLocalResourceRepositoryFactory(ioManager, indexCheckpointManagerProvider); localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository(); @@ -224,47 +216,21 @@ public class NCAppRuntimeContext implements INcApplicationContext { this.ncServiceContext); if (replicationProperties.isReplicationEnabled()) { + replicationManager = new ReplicationManager(this, replicationProperties); - replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties, - indexCheckpointManagerProvider); - - replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager, - txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider, ncServiceContext); - - if (replicationManager.getReplicationStrategy().isParticipant(getServiceContext().getNodeId())) { - - //pass replication manager to replication required object - //LogManager to replicate logs - txnSubsystem.getLogManager().setReplicationManager(replicationManager); - - //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index - localResourceRepository.setReplicationManager(replicationManager); - - /* - * add the partitions that will be replicated in this node as inactive partitions - */ - //get nodes which replicate to this node - Set<String> remotePrimaryReplicas = replicationManager.getReplicationStrategy() - .getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet()); - for (String clientId : remotePrimaryReplicas) { - //get the partitions of each client - ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId); - for (ClusterPartition partition : clientPartitions) { - localResourceRepository.addInactivePartition(partition.getPartitionId()); - } - } + //pass replication manager to replication required object + //LogManager to replicate logs + txnSubsystem.getLogManager().setReplicationManager(replicationManager); - //initialize replication channel - replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(), - replicaResourcesManager, replicationManager, getServiceContext(), - asterixAppRuntimeContextProvider, replicationManager.getReplicationStrategy()); + //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index + localResourceRepository.setReplicationManager(replicationManager); - remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties); + //initialize replication channel + replicationChannel = new ReplicationChannel(this); - bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), - storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), - replicationManager); - } + bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), + storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), + replicationManager); } else { bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory()); @@ -441,16 +407,6 @@ public class NCAppRuntimeContext implements INcApplicationContext { } @Override - public IReplicaResourcesManager getReplicaResourcesManager() { - return replicaResourcesManager; - } - - @Override - public IRemoteRecoveryManager getRemoteRecoveryManager() { - return remoteRecoveryManager; - } - - @Override public IReplicationManager getReplicationManager() { return replicationManager; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index f458688..080ad48 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; @@ -46,7 +47,6 @@ import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; -import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.transactions.Checkpoint; @@ -96,16 +96,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private final ICheckpointManager checkpointManager; private SystemState state; private final INCServiceContext serviceCtx; + private final INcApplicationContext appCtx; + public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) { this.serviceCtx = serviceCtx; this.txnSubsystem = txnSubsystem; + this.appCtx = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext(); logMgr = (LogManager) txnSubsystem.getLogManager(); - ReplicationProperties repProperties = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext() - .getReplicationProperties(); + ReplicationProperties repProperties = appCtx.getReplicationProperties(); replicationEnabled = repProperties.isReplicationEnabled(); - localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() - .getLocalResourceRepository(); + localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize(); checkpointManager = txnSubsystem.getCheckpointManager(); } @@ -129,32 +130,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { LOGGER.info("The checkpoint file doesn't exist: systemState = PERMANENT_DATA_LOSS"); return state; } - - if (replicationEnabled) { - if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { - //no logs exist - state = SystemState.HEALTHY; - } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp()) { - //only remote logs exist - state = SystemState.HEALTHY; - } else { - //need to perform remote recovery - state = SystemState.CORRUPTED; + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + if (logMgr.getAppendLSN() == readableSmallestLSN) { + if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { + LOGGER.warn("Some(or all) of transaction log files are lost."); + //No choice but continuing when the log files are lost. } + state = SystemState.HEALTHY; + } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() + && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { + state = SystemState.HEALTHY; } else { - long readableSmallestLSN = logMgr.getReadableSmallestLSN(); - if (logMgr.getAppendLSN() == readableSmallestLSN) { - if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { - LOGGER.warn("Some(or all) of transaction log files are lost."); - //No choice but continuing when the log files are lost. - } - state = SystemState.HEALTHY; - } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() - && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { - state = SystemState.HEALTHY; - } else { - state = SystemState.CORRUPTED; - } + state = SystemState.CORRUPTED; } return state; } @@ -442,9 +429,47 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } private long getRemoteMinFirstLSN() throws HyracksDataException { - IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getAppContext().getReplicaResourcesManager(); - return remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions()); + // find the min first lsn of partitions that are replicated on this node + final Set<Integer> allPartitions = localResourceRepository.getAllPartitions(); + final INcApplicationContext appContext = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext(); + final Set<Integer> masterPartitions = appContext.getReplicaManager().getPartitions(); + allPartitions.removeAll(masterPartitions); + return getPartitionsMinLSN(allPartitions); + } + + private long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException { + final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = appCtx.getIndexCheckpointManagerProvider(); + long minRemoteLSN = Long.MAX_VALUE; + for (Integer partition : partitions) { + final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + for (DatasetResourceReference indexRef : partitionResources) { + long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark(); + minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); + } + } + return minRemoteLSN; + } + + @Override + public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException { + long minLSN = getPartitionsMinLSN(partitions); + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + if (minLSN < readableSmallestLSN) { + minLSN = readableSmallestLSN; + } + + //replay logs > minLSN that belong to these partitions + try { + replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN); + if (flush) { + appCtx.getDatasetLifecycleManager().flushAllDatasets(); + } + } catch (IOException | ACIDException e) { + throw HyracksDataException.create(e); + } } @Override @@ -535,7 +560,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { TxnEntityId loserEntity; List<Long> undoLSNSet = null; //get active partitions on this node - Set<Integer> activePartitions = localResourceRepository.getActivePartitions(); + Set<Integer> activePartitions = appCtx.getReplicaManager().getPartitions(); ILogReader logReader = logMgr.getLogReader(false); try { logReader.setPosition(firstLSN); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java index bf17a5b..155fa1d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java @@ -29,10 +29,10 @@ import java.util.stream.Stream; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.replication.IPartitionReplica; -import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.storage.IReplicaManager; import org.apache.asterix.common.storage.ReplicaIdentifier; -import org.apache.asterix.replication.storage.PartitionReplica; +import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.replication.api.PartitionReplica; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ReplicaManager implements IReplicaManager { @@ -67,7 +67,9 @@ public class ReplicaManager implements IReplicaManager { if (!replicas.containsKey(id)) { throw new IllegalStateException("replica with id(" + id + ") does not exist"); } - replicas.remove(id); + PartitionReplica replica = replicas.remove(id); + appCtx.getReplicationManager().unregister(replica); + } @Override @@ -83,8 +85,8 @@ public class ReplicaManager implements IReplicaManager { @Override public synchronized void promote(int partition) throws HyracksDataException { - final IRemoteRecoveryManager remoteRecoveryManager = appCtx.getRemoteRecoveryManager(); - remoteRecoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true); + final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager(); + recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true); partitions.add(partition); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index cd9b617..fae0413 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -76,7 +76,7 @@ public class TransactionSubsystem implements ITransactionSubsystem { if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Checkpoint Properties: " + checkpointProperties); } - checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); + checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties); final Checkpoint latestCheckpoint = checkpointManager.getLatest(); if (latestCheckpoint != null) { transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java deleted file mode 100644 index d330684..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.nc.task; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -public class RemoteRecoveryTask implements INCLifecycleTask { - - private static final long serialVersionUID = 1L; - private final Map<String, Set<Integer>> recoveryPlan; - - public RemoteRecoveryTask(Map<String, Set<Integer>> recoveryPlan) { - this.recoveryPlan = recoveryPlan; - } - - @Override - public void perform(IControllerService cs) throws HyracksDataException { - INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); - appContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan); - } - - private void writeObject(java.io.ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - } - - @Override - public String toString() { - return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"recovery-plan\" : " + recoveryPlan + " }"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java deleted file mode 100644 index ecd93a3..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.nc.task; - -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -public class StartFailbackTask implements INCLifecycleTask { - - private static final long serialVersionUID = 1L; - - @Override - public void perform(IControllerService cs) throws HyracksDataException { - INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); - appContext.getRemoteRecoveryManager().startFailbackProcess(); - } - - @Override - public String toString() { - return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java index 253f121..7071271 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java @@ -20,7 +20,6 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.replication.IReplicationManager; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; @@ -32,13 +31,8 @@ public class StartReplicationServiceTask implements INCLifecycleTask { public void perform(IControllerService cs) throws HyracksDataException { INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); try { - //Open replication channel + // open replication channel appContext.getReplicationChannel().start(); - final IReplicationManager replicationManager = appContext.getReplicationManager(); - //Check the state of remote replicas - replicationManager.initializeReplicasState(); - //Start replication after the state of remote replicas has been initialized. - replicationManager.startReplicationThreads(); } catch (Exception e) { throw HyracksDataException.create(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java deleted file mode 100644 index 37b4d4f..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java +++ /dev/null @@ -1,542 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.replication; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.app.nc.task.BindMetadataNodeTask; -import org.apache.asterix.app.nc.task.CheckpointTask; -import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; -import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportLocalCountersTask; -import org.apache.asterix.app.nc.task.StartFailbackTask; -import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; -import org.apache.asterix.app.nc.task.StartReplicationServiceTask; -import org.apache.asterix.app.replication.NodeFailbackPlan.FailbackPlanState; -import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage; -import org.apache.asterix.app.replication.message.CompleteFailbackResponseMessage; -import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; -import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage; -import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage; -import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; -import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; -import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage; -import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage; -import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage; -import org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; -import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.replication.Replica; -import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.util.FaultToleranceUtil; -import org.apache.hyracks.api.application.ICCServiceContext; -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { - - private static final Logger LOGGER = LogManager.getLogger(); - private long clusterRequestId = 0; - - private Set<String> failedNodes = new HashSet<>(); - private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new LinkedList<>(); - private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new HashMap<>(); - private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = new HashMap<>();; - private String currentMetadataNode; - private boolean metadataNodeActive = false; - private IClusterStateManager clusterManager; - private ICCMessageBroker messageBroker; - private IReplicationStrategy replicationStrategy; - private ICCServiceContext serviceCtx; - private Set<String> pendingStartupCompletionNodes = new HashSet<>(); - private List<String> nodeIds; - private Map<String, SystemState> startupQueue = new HashMap<>(); - - @Override - public void notifyNodeJoin(String nodeId) { - pendingStartupCompletionNodes.add(nodeId); - } - - @Override - public void notifyNodeFailure(String nodeId) throws HyracksDataException { - //if this node was waiting for failback and failed before it completed - if (failedNodes.contains(nodeId)) { - notifyFailbackPlansNodeFailure(nodeId); - revertFailedFailbackPlanEffects(); - return; - } - //an active node failed - failedNodes.add(nodeId); - clusterManager.updateNodePartitions(nodeId, false); - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = false; - clusterManager.updateMetadataNode(nodeId, metadataNodeActive); - } - validateClusterState(); - FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE, clusterManager, messageBroker, - replicationStrategy); - notifyFailbackPlansNodeFailure(nodeId); - requestPartitionsTakeover(nodeId); - } - - private synchronized void notifyFailbackPlansNodeFailure(String nodeId) { - for (NodeFailbackPlan plan : planId2FailbackPlanMap.values()) { - plan.notifyNodeFailure(nodeId); - } - } - - private synchronized void revertFailedFailbackPlanEffects() { - Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //TODO if the failing back node is still active, notify it to construct a new plan for it - iterator.remove(); - - //reassign the partitions that were supposed to be failed back to an active replica - requestPartitionsTakeover(plan.getNodeId()); - } - } - } - - private synchronized void requestPartitionsTakeover(String failedNodeId) { - //replica -> list of partitions to takeover - Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>(); - //collect the partitions of the failed NC - List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId); - if (!lostPartitions.isEmpty()) { - for (ClusterPartition partition : lostPartitions) { - //find replicas for this partitions - Set<String> partitionReplicas = replicationStrategy.getRemoteReplicas(partition.getNodeId()).stream() - .map(Replica::getId).collect(Collectors.toSet()); - //find a replica that is still active - for (String replica : partitionReplicas) { - //TODO (mhubail) currently this assigns the partition to the first found active replica. - //It needs to be modified to consider load balancing. - if (addActiveReplica(replica, partition, partitionRecoveryPlan)) { - break; - } - } - } - - if (partitionRecoveryPlan.size() == 0) { - //no active replicas were found for the failed node - LOGGER.error("Could not find active replicas for the partitions " + lostPartitions); - return; - } else { - LOGGER.info("Partitions to recover: " + lostPartitions); - } - //For each replica, send a request to takeover the assigned partitions - partitionRecoveryPlan.forEach((replica, value) -> { - Integer[] partitionsToTakeover = value.toArray(new Integer[value.size()]); - long requestId = clusterRequestId++; - TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, - replica, partitionsToTakeover); - pendingTakeoverRequests.put(requestId, takeoverRequest); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, replica); - } catch (Exception e) { - /* - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, we will send any pending request - * that belongs to the failed NC to a different active replica. - */ - LOGGER.log(Level.WARN, "Failed to send takeover request: " + takeoverRequest, e); - } - }); - } - } - - private boolean addActiveReplica(String replica, ClusterPartition partition, - Map<String, List<Integer>> partitionRecoveryPlan) { - final Set<String> participantNodes = clusterManager.getParticipantNodes(); - if (participantNodes.contains(replica) && !failedNodes.contains(replica)) { - if (!partitionRecoveryPlan.containsKey(replica)) { - List<Integer> replicaPartitions = new ArrayList<>(); - replicaPartitions.add(partition.getPartitionId()); - partitionRecoveryPlan.put(replica, replicaPartitions); - } else { - partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); - } - return true; - } - return false; - } - - private synchronized void prepareFailbackPlan(String failingBackNodeId) { - NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId); - pendingProcessingFailbackPlans.add(plan); - planId2FailbackPlanMap.put(plan.getPlanId(), plan); - - //get all partitions this node requires to resync - Set<String> nodeReplicas = replicationStrategy.getRemoteReplicas(failingBackNodeId).stream().map(Replica::getId) - .collect(Collectors.toSet()); - clusterManager.getClusterPartitons(); - for (String replicaId : nodeReplicas) { - ClusterPartition[] nodePartitions = clusterManager.getNodePartitions(replicaId); - for (ClusterPartition partition : nodePartitions) { - plan.addParticipant(partition.getActiveNodeId()); - /* - * if the partition original node is the returning node, - * add it to the list of the partitions which will be failed back - */ - if (partition.getNodeId().equals(failingBackNodeId)) { - plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId()); - } - } - } - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Prepared Failback plan: " + plan.toString()); - } - - processPendingFailbackPlans(); - } - - private synchronized void processPendingFailbackPlans() { - ClusterState state = clusterManager.getState(); - /* - * if the cluster state is not ACTIVE, then failbacks should not be processed - * since some partitions are not active - */ - if (state == ClusterState.ACTIVE) { - while (!pendingProcessingFailbackPlans.isEmpty()) { - //take the first pending failback plan - NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop(); - /* - * A plan at this stage will be in one of two states: - * 1. PREPARING -> the participants were selected but we haven't sent any request. - * 2. PENDING_ROLLBACK -> a participant failed before we send any requests - */ - if (plan.getState() == FailbackPlanState.PREPARING) { - //set the partitions that will be failed back as inactive - String failbackNode = plan.getNodeId(); - for (Integer partitionId : plan.getPartitionsToFailback()) { - //partition expected to be returned to the failing back node - clusterManager.updateClusterPartition(partitionId, failbackNode, false); - } - - /* - * if the returning node is the original metadata node, - * then metadata node will change after the failback completes - */ - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - String originalMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName(); - if (originalMetadataNode.equals(failbackNode)) { - plan.setNodeToReleaseMetadataManager(currentMetadataNode); - currentMetadataNode = ""; - metadataNodeActive = false; - clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive); - } - - //force new jobs to wait - clusterManager.setState(ClusterState.REBALANCING); - handleFailbackRequests(plan, messageBroker); - /* - * wait until the current plan is completed before processing the next plan. - * when the current one completes or is reverted, the cluster state will be - * ACTIVE again, and the next failback plan (if any) will be processed. - */ - break; - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //this plan failed before sending any requests -> nothing to rollback - planId2FailbackPlanMap.remove(plan.getPlanId()); - } - } - } - } - - private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) { - //send requests to other nodes to complete on-going jobs and prepare partitions for failback - for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) { - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeID()); - plan.addPendingRequest(request); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failed to send failback request to: " + request.getNodeID(), e); - plan.notifyNodeFailure(request.getNodeID()); - revertFailedFailbackPlanEffects(); - break; - } - } - } - - public synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) { - List<ClusterPartition> nodePartitions = new ArrayList<>(); - ClusterPartition[] clusterPartitons = clusterManager.getClusterPartitons(); - Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>(); - for (ClusterPartition partition : clusterPartitons) { - clusterPartitionsMap.put(partition.getPartitionId(), partition); - } - for (ClusterPartition partition : clusterPartitons) { - if (nodeId.equals(partition.getActiveNodeId())) { - nodePartitions.add(partition); - } - } - /* - * if there is any pending takeover request this node was supposed to handle, - * it needs to be sent to a different replica - */ - List<Long> failedTakeoverRequests = new ArrayList<>(); - for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) { - if (request.getNodeId().equals(nodeId)) { - for (Integer partitionId : request.getPartitions()) { - nodePartitions.add(clusterPartitionsMap.get(partitionId)); - } - failedTakeoverRequests.add(request.getRequestId()); - } - } - - //remove failed requests - for (Long requestId : failedTakeoverRequests) { - pendingTakeoverRequests.remove(requestId); - } - return nodePartitions; - } - - public synchronized void process(TakeoverPartitionsResponseMessage response) throws HyracksDataException { - for (Integer partitonId : response.getPartitions()) { - clusterManager.updateClusterPartition(partitonId, response.getNodeId(), true); - } - pendingTakeoverRequests.remove(response.getRequestId()); - validateClusterState(); - } - - public synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException { - currentMetadataNode = response.getNodeId(); - metadataNodeActive = true; - clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive); - validateClusterState(); - } - - private void validateClusterState() throws HyracksDataException { - clusterManager.refreshState(); - ClusterState newState = clusterManager.getState(); - // PENDING: all partitions are active but metadata node is not - if (newState == ClusterState.PENDING) { - requestMetadataNodeTakeover(); - } else if (newState == ClusterState.ACTIVE) { - processPendingFailbackPlans(); - } - } - - public synchronized void process(PreparePartitionsFailbackResponseMessage msg) { - NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId()); - plan.markRequestCompleted(msg.getRequestId()); - /* - * A plan at this stage will be in one of three states: - * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait). - * 2. PENDING_COMPLETION -> all responses received (time to send completion request). - * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert). - */ - if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) { - CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage(); - - //send complete resync and takeover partitions to the failing back node - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeId()); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failed to send complete failback request to: " + request.getNodeId(), e); - notifyFailbackPlansNodeFailure(request.getNodeId()); - revertFailedFailbackPlanEffects(); - } - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - revertFailedFailbackPlanEffects(); - } - } - - public synchronized void process(CompleteFailbackResponseMessage response) throws HyracksDataException { - /* - * the failback plan completed successfully: - * Remove all references to it. - * Remove the the failing back node from the failed nodes list. - * Notify its replicas to reconnect to it. - * Set the failing back node partitions as active. - */ - NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId()); - String nodeId = plan.getNodeId(); - failedNodes.remove(nodeId); - //notify impacted replicas they can reconnect to this node - FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, messageBroker, - replicationStrategy); - clusterManager.updateNodePartitions(nodeId, true); - validateClusterState(); - } - - private synchronized void requestMetadataNodeTakeover() { - //need a new node to takeover metadata node - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - ClusterPartition metadataPartiton = appCtx.getMetadataProperties().getMetadataPartition(); - //request the metadataPartition node to register itself as the metadata node - MetadataNodeRequestMessage takeoverRequest = new MetadataNodeRequestMessage(true); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId()); - // Since the metadata node will be changed, we need to rebind the proxy object - MetadataManager.INSTANCE.rebindMetadataNode(); - } catch (Exception e) { - /* - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, a new NC will be assigned to - * the metadata partition and a new metadata node takeover request will be sent to it. - */ - LOGGER.log(Level.WARN, - "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e); - } - } - - @Override - public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) { - AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy(); - ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); - ft.replicationStrategy = replicationStrategy; - ft.serviceCtx = serviceCtx; - ft.nodeIds = serviceCtx.getAppConfig().getNCNames(); - return ft; - } - - @Override - public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { - switch (message.getType()) { - case REGISTRATION_TASKS_REQUEST: - process((RegistrationTasksRequestMessage) message); - break; - case REGISTRATION_TASKS_RESULT: - process((NCLifecycleTaskReportMessage) message); - break; - case TAKEOVER_PARTITION_RESPONSE: - process((TakeoverPartitionsResponseMessage) message); - break; - case METADATA_NODE_RESPONSE: - process((MetadataNodeResponseMessage) message); - break; - case PREPARE_FAILBACK_RESPONSE: - process((PreparePartitionsFailbackResponseMessage) message); - break; - case COMPLETE_FAILBACK_RESPONSE: - process((CompleteFailbackResponseMessage) message); - break; - default: - throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name()); - } - } - - private synchronized void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { - final String nodeId = msg.getNodeId(); - pendingStartupCompletionNodes.remove(nodeId); - if (msg.isSuccess()) { - if (failedNodes.contains(nodeId)) { - prepareFailbackPlan(nodeId); - return; - } - // If this node failed and recovered, notify impacted replicas to reconnect to it - if (replicationStrategy.isParticipant(nodeId) && failedNodes.remove(nodeId)) { - FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, - messageBroker, replicationStrategy); - } - clusterManager.updateNodePartitions(nodeId, true); - if (msg.getNodeId().equals(currentMetadataNode)) { - clusterManager.updateMetadataNode(currentMetadataNode, true); - } - clusterManager.refreshState(); - } else { - LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException()); - } - } - - @Override - public void bindTo(IClusterStateManager clusterManager) { - this.clusterManager = clusterManager; - currentMetadataNode = clusterManager.getCurrentMetadataNodeId(); - } - - private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { - final String nodeId = msg.getNodeId(); - final SystemState state = msg.getState(); - //last node needed to start - if (startupQueue.keySet().size() == nodeIds.size() - 1) { - startupQueue.put(nodeId, state); - for (Map.Entry<String, SystemState> nodeState : startupQueue.entrySet()) { - List<INCLifecycleTask> tasks = buildStartupSequence(nodeState.getKey()); - RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeState.getKey(), - tasks); - try { - messageBroker.sendApplicationMessageToNC(response, nodeState.getKey()); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - } else if (!failedNodes.isEmpty()) { - List<INCLifecycleTask> tasks = buildFailbackStartupSequence(); - RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); - try { - messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } else { - startupQueue.put(nodeId, state); - } - } - - private List<INCLifecycleTask> buildFailbackStartupSequence() { - final List<INCLifecycleTask> tasks = new ArrayList<>(); - tasks.add(new StartFailbackTask()); - tasks.add(new ReportLocalCountersTask()); - tasks.add(new StartLifecycleComponentsTask()); - return tasks; - } - - private List<INCLifecycleTask> buildStartupSequence(String nodeId) { - final List<INCLifecycleTask> tasks = new ArrayList<>(); - tasks.add(new StartReplicationServiceTask()); - final boolean isMetadataNode = nodeId.equals(currentMetadataNode); - if (isMetadataNode) { - tasks.add(new MetadataBootstrapTask()); - } - tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportLocalCountersTask()); - tasks.add(new CheckpointTask()); - tasks.add(new StartLifecycleComponentsTask()); - if (isMetadataNode) { - tasks.add(new BindMetadataNodeTask(true)); - } - return tasks; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java index fe24fca..5a715d6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java @@ -18,9 +18,7 @@ */ package org.apache.asterix.app.replication; -import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.replication.IFaultToleranceStrategy; -import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.hyracks.api.application.ICCServiceContext; public class FaultToleranceStrategyFactory { @@ -29,20 +27,10 @@ public class FaultToleranceStrategyFactory { throw new AssertionError(); } - public static final String STRATEGY_NAME = "metadata_only"; - - public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, ReplicationProperties repProp, - IReplicationStrategy strategy) { - Class<? extends IFaultToleranceStrategy> clazz; - if (!repProp.isReplicationEnabled()) { - clazz = NoFaultToleranceStrategy.class; - } else if (STRATEGY_NAME.equals(repProp.getReplicationStrategy())) { - clazz = MetadataNodeFaultToleranceStrategy.class; - } else { - clazz = AutoFaultToleranceStrategy.class; - } + public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, boolean replicationEnabled) { + Class<? extends IFaultToleranceStrategy> clazz = NoFaultToleranceStrategy.class; try { - return clazz.newInstance().from(serviceCtx, strategy); + return clazz.newInstance().from(serviceCtx, replicationEnabled); } catch (InstantiationException | IllegalAccessException e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java deleted file mode 100644 index 02174da..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.replication; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.app.nc.task.BindMetadataNodeTask; -import org.apache.asterix.app.nc.task.CheckpointTask; -import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; -import org.apache.asterix.app.nc.task.LocalRecoveryTask; -import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.RemoteRecoveryTask; -import org.apache.asterix.app.nc.task.ReportLocalCountersTask; -import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; -import org.apache.asterix.app.nc.task.StartReplicationServiceTask; -import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage; -import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage; -import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; -import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; -import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage; -import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage; -import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; -import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.replication.Replica; -import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; -import org.apache.asterix.util.FaultToleranceUtil; -import org.apache.hyracks.api.application.ICCServiceContext; -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrategy { - - private static final Logger LOGGER = LogManager.getLogger(); - private IClusterStateManager clusterManager; - private String metadataNodeId; - private IReplicationStrategy replicationStrategy; - private ICCMessageBroker messageBroker; - private ICCServiceContext serviceCtx; - private final Set<String> hotStandbyMetadataReplica = new HashSet<>(); - private final Set<String> failedNodes = new HashSet<>(); - private Set<String> pendingStartupCompletionNodes = new HashSet<>(); - - @Override - public synchronized void notifyNodeJoin(String nodeId) throws HyracksDataException { - pendingStartupCompletionNodes.add(nodeId); - } - - @Override - public synchronized void notifyNodeFailure(String nodeId) throws HyracksDataException { - failedNodes.add(nodeId); - hotStandbyMetadataReplica.remove(nodeId); - clusterManager.updateNodePartitions(nodeId, false); - if (nodeId.equals(metadataNodeId)) { - clusterManager.updateMetadataNode(metadataNodeId, false); - } - clusterManager.refreshState(); - if (replicationStrategy.isParticipant(nodeId)) { - // Notify impacted replica - FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE, clusterManager, - messageBroker, replicationStrategy); - } - // If the failed node is the metadata node, ask its replicas to replay any committed jobs - if (nodeId.equals(metadataNodeId)) { - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - int metadataPartitionId = appCtx.getMetadataProperties().getMetadataPartition().getPartitionId(); - Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId)); - Set<Replica> activeRemoteReplicas = replicationStrategy.getRemoteReplicas(metadataNodeId).stream() - .filter(replica -> !failedNodes.contains(replica.getId())).collect(Collectors.toSet()); - //TODO Do election to identity the node with latest state - for (Replica replica : activeRemoteReplicas) { - ReplayPartitionLogsRequestMessage msg = new ReplayPartitionLogsRequestMessage(metadataPartition); - try { - messageBroker.sendApplicationMessageToNC(msg, replica.getId()); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failed sending an application message to an NC", e); - continue; - } - } - } - } - - @Override - public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) { - MetadataNodeFaultToleranceStrategy ft = new MetadataNodeFaultToleranceStrategy(); - ft.replicationStrategy = replicationStrategy; - ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); - ft.serviceCtx = serviceCtx; - return ft; - } - - @Override - public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { - switch (message.getType()) { - case REGISTRATION_TASKS_REQUEST: - process((RegistrationTasksRequestMessage) message); - break; - case REGISTRATION_TASKS_RESULT: - process((NCLifecycleTaskReportMessage) message); - break; - case REPLAY_LOGS_RESPONSE: - process((ReplayPartitionLogsResponseMessage) message); - break; - case METADATA_NODE_RESPONSE: - process((MetadataNodeResponseMessage) message); - break; - default: - throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name()); - } - } - - @Override - public synchronized void bindTo(IClusterStateManager clusterManager) { - this.clusterManager = clusterManager; - this.metadataNodeId = clusterManager.getCurrentMetadataNodeId(); - } - - @Override - public void notifyMetadataNodeChange(String node) throws HyracksDataException { - if (metadataNodeId.equals(node)) { - return; - } - // if current metadata node is active, we need to unbind its metadata proxy object - if (clusterManager.isMetadataNodeActive()) { - MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false); - try { - messageBroker.sendApplicationMessageToNC(msg, metadataNodeId); - // when the current node responses, we will bind to the new one - metadataNodeId = node; - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } else { - requestMetadataNodeTakeover(node); - } - } - - private synchronized void process(ReplayPartitionLogsResponseMessage msg) { - hotStandbyMetadataReplica.add(msg.getNodeId()); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Hot Standby Metadata Replicas: " + hotStandbyMetadataReplica); - } - } - - private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { - final String nodeId = msg.getNodeId(); - final SystemState state = msg.getState(); - final boolean isParticipant = replicationStrategy.isParticipant(nodeId); - List<INCLifecycleTask> tasks; - if (!isParticipant) { - tasks = buildNonParticipantStartupSequence(nodeId, state); - } else { - tasks = buildParticipantStartupSequence(nodeId, state); - } - RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); - try { - messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - - private synchronized void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { - final String nodeId = msg.getNodeId(); - pendingStartupCompletionNodes.remove(nodeId); - if (msg.isSuccess()) { - // If this node failed and recovered, notify impacted replicas to reconnect to it - if (replicationStrategy.isParticipant(nodeId) && failedNodes.remove(nodeId)) { - FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, - messageBroker, replicationStrategy); - } - clusterManager.updateNodePartitions(msg.getNodeId(), true); - if (msg.getNodeId().equals(metadataNodeId)) { - clusterManager.updateMetadataNode(metadataNodeId, true); - // When metadata node is active, it is the only hot standby replica - hotStandbyMetadataReplica.clear(); - hotStandbyMetadataReplica.add(metadataNodeId); - } - clusterManager.refreshState(); - } else { - LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException()); - } - } - - private List<INCLifecycleTask> buildNonParticipantStartupSequence(String nodeId, SystemState state) { - final List<INCLifecycleTask> tasks = new ArrayList<>(); - if (state == SystemState.CORRUPTED) { - //need to perform local recovery for node partitions - LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId)) - .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); - tasks.add(rt); - } - tasks.add(new ExternalLibrarySetupTask(false)); - tasks.add(new ReportLocalCountersTask()); - tasks.add(new CheckpointTask()); - tasks.add(new StartLifecycleComponentsTask()); - return tasks; - } - - private List<INCLifecycleTask> buildParticipantStartupSequence(String nodeId, SystemState state) { - final List<INCLifecycleTask> tasks = new ArrayList<>(); - switch (state) { - case PERMANENT_DATA_LOSS: - if (failedNodes.isEmpty()) { //bootstrap - break; - } - // If the metadata node (or replica) failed and lost its data - // => Metadata Remote Recovery from standby replica - tasks.add(getMetadataPartitionRecoveryPlan()); - // Checkpoint after remote recovery to move node to HEALTHY state - tasks.add(new CheckpointTask()); - break; - case CORRUPTED: - // If the metadata node (or replica) failed and started again without losing data => Local Recovery - LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId)) - .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); - tasks.add(rt); - break; - case BOOTSTRAPPING: - case HEALTHY: - case RECOVERING: - break; - default: - break; - } - tasks.add(new StartReplicationServiceTask()); - final boolean isMetadataNode = nodeId.equals(metadataNodeId); - if (isMetadataNode) { - tasks.add(new MetadataBootstrapTask()); - } - tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportLocalCountersTask()); - tasks.add(new CheckpointTask()); - tasks.add(new StartLifecycleComponentsTask()); - if (isMetadataNode) { - tasks.add(new BindMetadataNodeTask(true)); - } - return tasks; - } - - private RemoteRecoveryTask getMetadataPartitionRecoveryPlan() { - if (hotStandbyMetadataReplica.isEmpty()) { - throw new IllegalStateException("No metadata replicas to recover from"); - } - // Construct recovery plan: Node => Set of partitions to recover from it - Map<String, Set<Integer>> recoveryPlan = new HashMap<>(); - // Recover metadata partition from any metadata hot standby replica - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - int metadataPartitionId = appCtx.getMetadataProperties().getMetadataPartition().getPartitionId(); - Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId)); - recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), metadataPartition); - return new RemoteRecoveryTask(recoveryPlan); - } - - private void process(MetadataNodeResponseMessage response) throws HyracksDataException { - clusterManager.updateMetadataNode(response.getNodeId(), response.isExported()); - if (!response.isExported()) { - requestMetadataNodeTakeover(metadataNodeId); - } - } - - private void requestMetadataNodeTakeover(String node) throws HyracksDataException { - MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true); - try { - messageBroker.sendApplicationMessageToNC(msg, node); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } -} \ No newline at end of file
