Introduce Strategy Based Replication and Fault-Tolerance This change includes the following: - Introduce new APIs for Replication and Fault-Tolerance Strategies. - Add configuration in cluster description file for high-availability. - Add built-in replication strategies (Metadata_Only, Chained_Declustering) - Add built-in fault-tolerance strategies (Auto, Metadata_Node) - Remove none cluster state functionality from ClusterStateManager. - Add customizable NC startup sequence
Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd Reviewed-on: https://asterix-gerrit.ics.uci.edu/1405 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ef173f34 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ef173f34 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ef173f34 Branch: refs/heads/master Commit: ef173f34f0b26627a6a99ed350becf0f6c1b8395 Parents: fff200c Author: Murtadha Hubail <[email protected]> Authored: Sun Feb 19 20:27:29 2017 +0300 Committer: abdullah alamoudi <[email protected]> Committed: Sun Feb 19 12:29:35 2017 -0800 ---------------------------------------------------------------------- .../common/AsterixHyracksIntegrationUtil.java | 34 +- .../asterix/app/nc/NCAppRuntimeContext.java | 10 +- .../apache/asterix/app/nc/RecoveryManager.java | 44 +- .../asterix/app/nc/TransactionSubsystem.java | 18 +- .../app/nc/task/BindMetadataNodeTask.java | 51 ++ .../asterix/app/nc/task/CheckpointTask.java | 39 ++ .../app/nc/task/ExternalLibrarySetupTask.java | 48 ++ .../asterix/app/nc/task/LocalRecoveryTask.java | 51 ++ .../app/nc/task/MetadataBootstrapTask.java | 44 ++ .../asterix/app/nc/task/RemoteRecoveryTask.java | 54 ++ .../app/nc/task/ReportMaxResourceIdTask.java | 35 ++ .../asterix/app/nc/task/StartFailbackTask.java | 37 ++ .../nc/task/StartLifecycleComponentsTask.java | 67 +++ .../nc/task/StartReplicationServiceTask.java | 49 ++ .../replication/AutoFaultToleranceStrategy.java | 526 +++++++++++++++++++ .../FaultToleranceStrategyFactory.java | 66 +++ .../MetadataNodeFaultToleranceStrategy.java | 253 +++++++++ .../replication/NoFaultToleranceStrategy.java | 146 +++++ .../app/replication/NodeFailbackPlan.java | 209 ++++++++ .../message/CompleteFailbackRequestMessage.java | 98 ++++ .../CompleteFailbackResponseMessage.java | 60 +++ .../message/NCLifecycleTaskReportMessage.java | 63 +++ ...PreparePartitionsFailbackRequestMessage.java | 125 +++++ ...reparePartitionsFailbackResponseMessage.java | 56 ++ .../ReplayPartitionLogsRequestMessage.java | 64 +++ .../ReplayPartitionLogsResponseMessage.java | 56 ++ .../message/StartupTaskRequestMessage.java | 72 +++ .../message/StartupTaskResponseMessage.java | 77 +++ .../TakeoverMetadataNodeRequestMessage.java | 74 +++ .../TakeoverMetadataNodeResponseMessage.java | 53 ++ .../TakeoverPartitionsRequestMessage.java | 111 ++++ .../TakeoverPartitionsResponseMessage.java | 65 +++ .../bootstrap/CCApplicationEntryPoint.java | 45 +- .../bootstrap/NCApplicationEntryPoint.java | 177 ++----- .../apache/asterix/util/FaultToleranceUtil.java | 67 +++ .../asterix-app/src/main/resources/cluster.xml | 20 +- .../results/api/replication/replication.1.adm | 4 +- .../common/api/IDatasetLifecycleManager.java | 9 + .../asterix/common/api/INCLifecycleTask.java | 36 ++ .../common/cluster/IClusterStateManager.java | 85 +++ .../common/config/ClusterProperties.java | 55 +- .../common/config/ReplicationProperties.java | 208 ++------ .../common/context/DatasetLifecycleManager.java | 10 + .../ChainedDeclusteringReplicationStrategy.java | 85 +++ .../replication/IFaultToleranceStrategy.java | 67 +++ .../common/replication/INCLifecycleMessage.java | 45 ++ .../replication/IRemoteRecoveryManager.java | 22 + .../common/replication/IReplicationManager.java | 6 +- .../replication/IReplicationStrategy.java | 58 ++ .../MetadataOnlyReplicationStrategy.java | 90 ++++ .../replication/NoReplicationStrategy.java | 52 ++ .../asterix/common/replication/Replica.java | 6 +- .../replication/ReplicationStrategyFactory.java | 62 +++ .../common/storage/IndexFileProperties.java | 74 +++ .../common/transactions/IRecoveryManager.java | 3 + .../asterix/common/utils/StoragePathUtil.java | 11 + .../src/main/resources/schema/cluster.xsd | 36 +- .../apache/asterix/test/aql/TestExecutor.java | 16 +- .../installer/command/ConfigureCommand.java | 3 +- .../installer/command/ValidateCommand.java | 36 +- .../local/local_chained_declustering_rep.xml | 81 +++ .../clusters/local/local_metadata_only_rep.xml | 83 +++ .../clusters/local/local_with_replication.xml | 76 --- .../test/AsterixInstallerIntegrationUtil.java | 6 +- .../installer/test/MetadataReplicationIT.java | 118 +++++ .../asterix/installer/test/ReplicationIT.java | 2 +- .../metadata_node_recovery.1.ddl.aql | 50 ++ .../metadata_node_recovery.10.node.aql | 30 ++ .../metadata_node_recovery.11.sleep.aql | 1 + .../metadata_node_recovery.12.mgx.aql | 1 + .../metadata_node_recovery.13.sleep.aql | 1 + .../metadata_node_recovery.14.ddl.aql | 33 ++ .../metadata_node_recovery.15.node.aql | 30 ++ .../metadata_node_recovery.16.sleep.aql | 1 + .../metadata_node_recovery.17.mgx.aql | 1 + .../metadata_node_recovery.18.sleep.aql | 1 + .../metadata_node_recovery.19.query.aql | 35 ++ .../metadata_node_recovery.2.node.aql | 30 ++ .../metadata_node_recovery.3.sleep.aql | 1 + .../metadata_node_recovery.4.get.http | 30 ++ .../metadata_node_recovery.5.mgx.aql | 1 + .../metadata_node_recovery.6.sleep.aql | 1 + .../metadata_node_recovery.7.get.http | 30 ++ .../metadata_node_recovery.8.query.aql | 34 ++ .../metadata_node_recovery.9.ddl.aql | 33 ++ .../metadata_node_recovery.cluster_state.4.adm | 34 ++ .../metadata_node_recovery.cluster_state.7.adm | 34 ++ .../metadata_node_recovery.query.19.adm | 1 + .../metadata_node_recovery.query.8.adm | 1 + .../metadata_only_replication/testsuite.xml | 27 + .../functions/ReplicaFilesRequest.java | 24 +- .../management/ReplicationChannel.java | 92 ++-- .../management/ReplicationManager.java | 84 ++- .../recovery/RemoteRecoveryManager.java | 108 +++- .../storage/ReplicaResourcesManager.java | 4 +- .../message/AbstractFailbackPlanMessage.java | 4 +- .../message/CompleteFailbackRequestMessage.java | 92 ---- .../CompleteFailbackResponseMessage.java | 54 -- .../runtime/message/NodeFailbackPlan.java | 206 -------- ...PreparePartitionsFailbackRequestMessage.java | 119 ----- ...reparePartitionsFailbackResponseMessage.java | 50 -- .../TakeoverMetadataNodeRequestMessage.java | 69 --- .../TakeoverMetadataNodeResponseMessage.java | 48 -- .../TakeoverPartitionsRequestMessage.java | 107 ---- .../TakeoverPartitionsResponseMessage.java | 60 --- .../asterix/runtime/utils/AppContextInfo.java | 31 +- .../runtime/utils/ClusterStateManager.java | 484 +++-------------- .../PersistentLocalResourceRepository.java | 35 +- .../logging/LogManagerWithReplication.java | 35 +- .../aoya/test/AsterixYARNInstanceUtil.java | 9 + .../aoya/test/AsterixYARNLibraryTestIT.java | 1 + .../aoya/test/AsterixYARNLifecycleIT.java | 1 + 112 files changed, 4782 insertions(+), 1885 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 83233f1..54804a1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -18,23 +18,13 @@ */ package org.apache.asterix.api.common; -import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER; - -import java.io.File; -import java.io.IOException; -import java.net.Inet4Address; -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.config.PropertiesAccessor; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.config.PropertiesAccessor; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint; import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -46,6 +36,18 @@ import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; +import java.io.File; +import java.io.IOException; +import java.net.Inet4Address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER; + public class AsterixHyracksIntegrationUtil { static class LoggerHolder { static final Logger LOGGER = Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName()); @@ -100,6 +102,12 @@ public class AsterixHyracksIntegrationUtil { for (Thread thread : startupThreads) { thread.join(); } + // Wait until cluster becomes active + synchronized (ClusterStateManager.INSTANCE) { + while (ClusterStateManager.INSTANCE.getState() != ClusterState.ACTIVE) { + ClusterStateManager.INSTANCE.wait(); + } + } hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort); ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index b114e8c..b4b7a95 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -35,7 +35,6 @@ import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.BuildProperties; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.config.ExternalProperties; @@ -206,7 +205,7 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(), feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize()); - if (ClusterProperties.INSTANCE.isReplicationEnabled()) { + if (replicationProperties.isParticipant(ncApplicationContext.getNodeId())) { String nodeId = ncApplicationContext.getNodeId(); replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties); @@ -225,10 +224,8 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { * add the partitions that will be replicated in this node as inactive partitions */ //get nodes which replicate to this node - Set<String> replicationClients = replicationProperties.getNodeReplicationClients(nodeId); - //remove the node itself - replicationClients.remove(nodeId); - for (String clientId : replicationClients) { + Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId); + for (String clientId : remotePrimaryReplicas) { //get the partitions of each client ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId); for (ClusterPartition partition : clientPartitions) { @@ -475,5 +472,4 @@ public class NCAppRuntimeContext implements IAppRuntimeContext { public IStorageComponentProvider getStorageComponentProvider() { return componentProvider; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 4edf991..2efb139 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -43,7 +43,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.replication.IReplicaResourcesManager; @@ -97,9 +98,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { this.appCtx = appCtx; this.txnSubsystem = txnSubsystem; logMgr = (LogManager) txnSubsystem.getLogManager(); - replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); - localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem - .getAsterixAppRuntimeContextProvider().getLocalResourceRepository(); + ReplicationProperties repProperties = ((IPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider() + .getAppContext()).getReplicationProperties(); + replicationEnabled = repProperties.isParticipant(txnSubsystem.getId()); + localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() + .getLocalResourceRepository(); cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize(); checkpointManager = txnSubsystem.getCheckpointManager(); } @@ -127,15 +130,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } if (replicationEnabled) { - if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN - || (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp())) { - //no logs exist or only remote logs exist + if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { + //no logs exist + state = SystemState.HEALTHY; + } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp()) { + //only remote logs exist state = SystemState.HEALTHY; - return state; } else { //need to perform remote recovery state = SystemState.CORRUPTED; - return state; } } else { long readableSmallestLSN = logMgr.getReadableSmallestLSN(); @@ -145,16 +148,14 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //No choice but continuing when the log files are lost. } state = SystemState.HEALTHY; - return state; } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { state = SystemState.HEALTHY; - return state; } else { state = SystemState.CORRUPTED; - return state; } } + return state; } //This method is used only when replication is disabled. @@ -179,6 +180,25 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } @Override + public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException { + state = SystemState.RECOVERING; + LOGGER.log(Level.INFO, "starting recovery ..."); + + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + Checkpoint checkpointObject = checkpointManager.getLatest(); + long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn(); + if (lowWaterMarkLSN < readableSmallestLSN) { + lowWaterMarkLSN = readableSmallestLSN; + } + + //delete any recovery files from previous failed recovery attempts + deleteRecoveryTemporaryFiles(); + + //get active partitions on this node + replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN); + } + + @Override public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) throws IOException, ACIDException { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index 808a252..8e9463b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -21,11 +21,11 @@ package org.apache.asterix.app.nc; import java.util.concurrent.Callable; import java.util.logging.Logger; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.IPropertiesProvider; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.CheckpointProperties; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; @@ -70,7 +70,11 @@ public class TransactionSubsystem implements ITransactionSubsystem { this.txnProperties = txnProperties; this.transactionManager = new TransactionManager(this); this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); - final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); + ReplicationProperties repProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider + .getAppContext()).getReplicationProperties(); + IReplicationStrategy replicationStrategy = repProperties.getReplicationStrategy(); + final boolean replicationEnabled = repProperties.isParticipant(id); + final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id); checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); final Checkpoint latestCheckpoint = checkpointManager.getLatest(); @@ -80,14 +84,8 @@ public class TransactionSubsystem implements ITransactionSubsystem { latestCheckpoint.getStorageVersion(), StorageConstants.VERSION)); } - ReplicationProperties asterixReplicationProperties = null; - if (asterixAppRuntimeContextProvider != null) { - asterixReplicationProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()) - .getReplicationProperties(); - } - - if (asterixReplicationProperties != null && replicationEnabled) { - this.logManager = new LogManagerWithReplication(this); + if (replicationEnabled) { + this.logManager = new LogManagerWithReplication(this, replicationStrategy); } else { this.logManager = new LogManager(this); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java new file mode 100644 index 0000000..d1edec0 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class BindMetadataNodeTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private final boolean exportStub; + + public BindMetadataNodeTask(boolean exportStub) { + this.exportStub = exportStub; + } + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + try { + if (exportStub) { + runtimeContext.exportMetadataNodeStub(); + } else { + runtimeContext.unexportMetadataNodeStub(); + } + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java new file mode 100644 index 0000000..8ef6ae3 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class CheckpointTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager(); + checkpointMgr.doSharpCheckpoint(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java new file mode 100644 index 0000000..8e842a9 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ExternalLibrarySetupTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private final boolean metadataNode; + + public ExternalLibrarySetupTask(boolean metadataNode) { + this.metadataNode = metadataNode; + } + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + try { + ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java new file mode 100644 index 0000000..203e453 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import java.io.IOException; +import java.util.Set; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class LocalRecoveryTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private final Set<Integer> partitions; + + public LocalRecoveryTask(Set<Integer> partitions) { + this.partitions = partitions; + } + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + try { + runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions); + } catch (IOException | ACIDException e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java new file mode 100644 index 0000000..b6d85d9 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class MetadataBootstrapTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + try { + SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState(); + appContext.initializeMetadata(state == SystemState.NEW_UNIVERSE); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java new file mode 100644 index 0000000..4574304 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class RemoteRecoveryTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private final Map<String, Set<Integer>> recoveryPlan; + + public RemoteRecoveryTask(Map<String, Set<Integer>> recoveryPlan) { + this.recoveryPlan = recoveryPlan; + } + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + runtimeContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan); + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java new file mode 100644 index 0000000..7c30a29 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReportMaxResourceIdTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + ReportMaxResourceIdMessage.send((NodeControllerService) cs); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java new file mode 100644 index 0000000..6ae4487 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class StartFailbackTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + runtimeContext.getRemoteRecoveryManager().startFailbackProcess(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java new file mode 100644 index 0000000..3beb573 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; +import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.application.NCApplicationContext; + +public class StartLifecycleComponentsTask implements INCLifecycleTask { + + private static final Logger LOGGER = Logger.getLogger(StartLifecycleComponentsTask.class.getName()); + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + NCApplicationContext appContext = ncs.getApplicationContext(); + MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties(); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Starting lifecycle components"); + } + Map<String, String> lifecycleMgmtConfiguration = new HashMap<>(); + String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY; + String dumpPath = metadataProperties.getCoredumpPath(appContext.getNodeId()); + lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Coredump directory for NC is: " + dumpPath); + } + ILifeCycleComponentManager lccm = appContext.getLifeCycleComponentManager(); + lccm.configure(lifecycleMgmtConfiguration); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Configured:" + lccm); + } + appContext.setStateDumpHandler(new AsterixStateDumpHandler(appContext.getNodeId(), lccm.getDumpPath(), lccm)); + lccm.startAll(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java new file mode 100644 index 0000000..d060f61 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class StartReplicationServiceTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + try { + //Open replication channel + runtimeContext.getReplicationChannel().start(); + final IReplicationManager replicationManager = runtimeContext.getReplicationManager(); + //Check the state of remote replicas + replicationManager.initializeReplicasState(); + //Start replication after the state of remote replicas has been initialized. + replicationManager.startReplicationThreads(); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java new file mode 100644 index 0000000..084563f --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.app.nc.task.BindMetadataNodeTask; +import org.apache.asterix.app.nc.task.CheckpointTask; +import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; +import org.apache.asterix.app.nc.task.MetadataBootstrapTask; +import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.StartFailbackTask; +import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; +import org.apache.asterix.app.nc.task.StartReplicationServiceTask; +import org.apache.asterix.app.replication.NodeFailbackPlan.FailbackPlanState; +import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage; +import org.apache.asterix.app.replication.message.CompleteFailbackResponseMessage; +import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage; +import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage; +import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; +import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage; +import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage; +import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage; +import org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.util.FaultToleranceUtil; +import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { + + private static final Logger LOGGER = Logger.getLogger(AutoFaultToleranceStrategy.class.getName()); + private long clusterRequestId = 0; + + private Set<String> failedNodes = new HashSet<>(); + private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new LinkedList<>(); + private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new HashMap<>(); + private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = new HashMap<>();; + private String currentMetadataNode; + private boolean metadataNodeActive = false; + private IClusterStateManager clusterManager; + private ICCMessageBroker messageBroker; + private IReplicationStrategy replicationStrategy; + private Set<String> pendingStartupCompletionNodes = new HashSet<>(); + + @Override + public void notifyNodeJoin(String nodeId) throws HyracksDataException { + pendingStartupCompletionNodes.add(nodeId); + } + + @Override + public void notifyNodeFailure(String nodeId) throws HyracksDataException { + //if this node was waiting for failback and failed before it completed + if (failedNodes.contains(nodeId)) { + notifyFailbackPlansNodeFailure(nodeId); + revertFailedFailbackPlanEffects(); + return; + } + //an active node failed + failedNodes.add(nodeId); + clusterManager.updateNodePartitions(nodeId, false); + if (nodeId.equals(currentMetadataNode)) { + metadataNodeActive = false; + clusterManager.updateMetadataNode(nodeId, metadataNodeActive); + } + validateClusterState(); + FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE, clusterManager, messageBroker, + replicationStrategy); + notifyFailbackPlansNodeFailure(nodeId); + requestPartitionsTakeover(nodeId); + } + + private synchronized void notifyFailbackPlansNodeFailure(String nodeId) { + Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); + while (iterator.hasNext()) { + NodeFailbackPlan plan = iterator.next(); + plan.notifyNodeFailure(nodeId); + } + } + + private synchronized void revertFailedFailbackPlanEffects() { + Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); + while (iterator.hasNext()) { + NodeFailbackPlan plan = iterator.next(); + if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { + //TODO if the failing back node is still active, notify it to construct a new plan for it + iterator.remove(); + + //reassign the partitions that were supposed to be failed back to an active replica + requestPartitionsTakeover(plan.getNodeId()); + } + } + } + + private synchronized void requestPartitionsTakeover(String failedNodeId) { + //replica -> list of partitions to takeover + Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>(); + ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); + + //collect the partitions of the failed NC + List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId); + if (!lostPartitions.isEmpty()) { + for (ClusterPartition partition : lostPartitions) { + //find replicas for this partitions + Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId()); + //find a replica that is still active + for (String replica : partitionReplicas) { + //TODO (mhubail) currently this assigns the partition to the first found active replica. + //It needs to be modified to consider load balancing. + if (addActiveReplica(replica, partition, partitionRecoveryPlan)) { + break; + } + } + } + + if (partitionRecoveryPlan.size() == 0) { + //no active replicas were found for the failed node + LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions); + return; + } else { + LOGGER.info("Partitions to recover: " + lostPartitions); + } + //For each replica, send a request to takeover the assigned partitions + for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) { + String replica = entry.getKey(); + Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]); + long requestId = clusterRequestId++; + TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, + replica, partitionsToTakeover); + pendingTakeoverRequests.put(requestId, takeoverRequest); + try { + messageBroker.sendApplicationMessageToNC(takeoverRequest, replica); + } catch (Exception e) { + /** + * if we fail to send the request, it means the NC we tried to send the request to + * has failed. When the failure notification arrives, we will send any pending request + * that belongs to the failed NC to a different active replica. + */ + LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e); + } + } + } + } + + private boolean addActiveReplica(String replica, ClusterPartition partition, + Map<String, List<Integer>> partitionRecoveryPlan) { + Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration(); + if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) { + if (!partitionRecoveryPlan.containsKey(replica)) { + List<Integer> replicaPartitions = new ArrayList<>(); + replicaPartitions.add(partition.getPartitionId()); + partitionRecoveryPlan.put(replica, replicaPartitions); + } else { + partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); + } + return true; + } + return false; + } + + private synchronized void prepareFailbackPlan(String failingBackNodeId) { + NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId); + pendingProcessingFailbackPlans.add(plan); + planId2FailbackPlanMap.put(plan.getPlanId(), plan); + + //get all partitions this node requires to resync + ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); + Set<String> nodeReplicas = replicationProperties.getNodeReplicasIds(failingBackNodeId); + clusterManager.getClusterPartitons(); + for (String replicaId : nodeReplicas) { + ClusterPartition[] nodePartitions = clusterManager.getNodePartitions(replicaId); + for (ClusterPartition partition : nodePartitions) { + plan.addParticipant(partition.getActiveNodeId()); + /** + * if the partition original node is the returning node, + * add it to the list of the partitions which will be failed back + */ + if (partition.getNodeId().equals(failingBackNodeId)) { + plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId()); + } + } + } + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Prepared Failback plan: " + plan.toString()); + } + + processPendingFailbackPlans(); + } + + private synchronized void processPendingFailbackPlans() { + ClusterState state = clusterManager.getState(); + /** + * if the cluster state is not ACTIVE, then failbacks should not be processed + * since some partitions are not active + */ + if (state == ClusterState.ACTIVE) { + while (!pendingProcessingFailbackPlans.isEmpty()) { + //take the first pending failback plan + NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop(); + /** + * A plan at this stage will be in one of two states: + * 1. PREPARING -> the participants were selected but we haven't sent any request. + * 2. PENDING_ROLLBACK -> a participant failed before we send any requests + */ + if (plan.getState() == FailbackPlanState.PREPARING) { + //set the partitions that will be failed back as inactive + String failbackNode = plan.getNodeId(); + for (Integer partitionId : plan.getPartitionsToFailback()) { + //partition expected to be returned to the failing back node + clusterManager.updateClusterPartition(partitionId, failbackNode, false); + } + + /** + * if the returning node is the original metadata node, + * then metadata node will change after the failback completes + */ + String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName(); + if (originalMetadataNode.equals(failbackNode)) { + plan.setNodeToReleaseMetadataManager(currentMetadataNode); + currentMetadataNode = ""; + metadataNodeActive = false; + clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive); + } + + //force new jobs to wait + clusterManager.setState(ClusterState.REBALANCING); + handleFailbackRequests(plan, messageBroker); + /** + * wait until the current plan is completed before processing the next plan. + * when the current one completes or is reverted, the cluster state will be + * ACTIVE again, and the next failback plan (if any) will be processed. + */ + break; + } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { + //this plan failed before sending any requests -> nothing to rollback + planId2FailbackPlanMap.remove(plan.getPlanId()); + } + } + } + } + + private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) { + //send requests to other nodes to complete on-going jobs and prepare partitions for failback + for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) { + try { + messageBroker.sendApplicationMessageToNC(request, request.getNodeID()); + plan.addPendingRequest(request); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e); + plan.notifyNodeFailure(request.getNodeID()); + revertFailedFailbackPlanEffects(); + break; + } + } + } + + public synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) { + List<ClusterPartition> nodePartitions = new ArrayList<>(); + ClusterPartition[] clusterPartitons = clusterManager.getClusterPartitons(); + Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>(); + for (ClusterPartition partition : clusterPartitons) { + clusterPartitionsMap.put(partition.getPartitionId(), partition); + } + for (ClusterPartition partition : clusterPartitons) { + if (partition.getActiveNodeId().equals(nodeId)) { + nodePartitions.add(partition); + } + } + /** + * if there is any pending takeover request this node was supposed to handle, + * it needs to be sent to a different replica + */ + List<Long> failedTakeoverRequests = new ArrayList<>(); + for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) { + if (request.getNodeId().equals(nodeId)) { + for (Integer partitionId : request.getPartitions()) { + nodePartitions.add(clusterPartitionsMap.get(partitionId)); + } + failedTakeoverRequests.add(request.getRequestId()); + } + } + + //remove failed requests + for (Long requestId : failedTakeoverRequests) { + pendingTakeoverRequests.remove(requestId); + } + return nodePartitions; + } + + public synchronized void process(TakeoverPartitionsResponseMessage response) throws HyracksDataException { + for (Integer partitonId : response.getPartitions()) { + clusterManager.updateClusterPartition(partitonId, response.getNodeId(), true); + } + pendingTakeoverRequests.remove(response.getRequestId()); + validateClusterState(); + } + + public synchronized void process(TakeoverMetadataNodeResponseMessage response) throws HyracksDataException { + currentMetadataNode = response.getNodeId(); + metadataNodeActive = true; + clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive); + validateClusterState(); + } + + private void validateClusterState() throws HyracksDataException { + clusterManager.refreshState(); + ClusterState newState = clusterManager.getState(); + // PENDING: all partitions are active but metadata node is not + if (newState == ClusterState.PENDING) { + requestMetadataNodeTakeover(); + } else if (newState == ClusterState.ACTIVE) { + processPendingFailbackPlans(); + } + } + + public synchronized void process(PreparePartitionsFailbackResponseMessage msg) { + NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId()); + plan.markRequestCompleted(msg.getRequestId()); + /** + * A plan at this stage will be in one of three states: + * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait). + * 2. PENDING_COMPLETION -> all responses received (time to send completion request). + * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert). + */ + if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) { + CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage(); + + //send complete resync and takeover partitions to the failing back node + try { + messageBroker.sendApplicationMessageToNC(request, request.getNodeId()); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e); + notifyFailbackPlansNodeFailure(request.getNodeId()); + revertFailedFailbackPlanEffects(); + } + } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { + revertFailedFailbackPlanEffects(); + } + } + + public synchronized void process(CompleteFailbackResponseMessage response) throws HyracksDataException { + /** + * the failback plan completed successfully: + * Remove all references to it. + * Remove the the failing back node from the failed nodes list. + * Notify its replicas to reconnect to it. + * Set the failing back node partitions as active. + */ + NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId()); + String nodeId = plan.getNodeId(); + failedNodes.remove(nodeId); + //notify impacted replicas they can reconnect to this node + FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, messageBroker, + replicationStrategy); + clusterManager.updateNodePartitions(nodeId, true); + validateClusterState(); + } + + private synchronized void requestMetadataNodeTakeover() { + //need a new node to takeover metadata node + ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition(); + //request the metadataPartition node to register itself as the metadata node + TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage(); + try { + messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId()); + // Since the metadata node will be changed, we need to rebind the proxy object + MetadataManager.INSTANCE.rebindMetadataNode(); + } catch (Exception e) { + + /** + * if we fail to send the request, it means the NC we tried to send the request to + * has failed. When the failure notification arrives, a new NC will be assigned to + * the metadata partition and a new metadata node takeover request will be sent to it. + */ + LOGGER.log(Level.WARNING, + "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e); + } + } + + @Override + public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) { + AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy(); + ft.messageBroker = messageBroker; + ft.replicationStrategy = replicationStrategy; + return ft; + } + + @Override + public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { + switch (message.getType()) { + case STARTUP_TASK_REQUEST: + process((StartupTaskRequestMessage) message); + break; + case STARTUP_TASK_RESULT: + process((NCLifecycleTaskReportMessage) message); + break; + case TAKEOVER_PARTITION_RESPONSE: + process((TakeoverPartitionsResponseMessage) message); + break; + case TAKEOVER_METADATA_NODE_RESPONSE: + process((TakeoverMetadataNodeResponseMessage) message); + break; + case PREPARE_FAILBACK_RESPONSE: + process((PreparePartitionsFailbackResponseMessage) message); + break; + case COMPLETE_FAILBACK_RESPONSE: + process((CompleteFailbackResponseMessage) message); + break; + default: + throw new HyracksDataException("Unsupported message type: " + message.getType().name()); + } + } + + private synchronized void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { + final String nodeId = msg.getNodeId(); + pendingStartupCompletionNodes.remove(nodeId); + if (msg.isSuccess()) { + if (failedNodes.contains(nodeId)) { + prepareFailbackPlan(nodeId); + return; + } + // If this node failed and recovered, notify impacted replicas to reconnect to it + if (replicationStrategy.isParticipant(nodeId) && failedNodes.remove(nodeId)) { + FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, + messageBroker, replicationStrategy); + } + clusterManager.updateNodePartitions(nodeId, true); + if (msg.getNodeId().equals(currentMetadataNode)) { + clusterManager.updateMetadataNode(currentMetadataNode, true); + } + clusterManager.refreshState(); + } else { + LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException()); + } + } + + @Override + public void bindTo(IClusterStateManager clusterManager) { + this.clusterManager = clusterManager; + currentMetadataNode = clusterManager.getCurrentMetadataNodeId(); + } + + private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException { + final String nodeId = msg.getNodeId(); + final SystemState state = msg.getState(); + List<INCLifecycleTask> tasks; + if (state == SystemState.INITIAL_RUN || state == SystemState.HEALTHY) { + tasks = buildStartupSequence(nodeId); + } else { + // failed node returned. Need to start failback process + tasks = buildFailbackStartupSequence(); + } + StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + try { + messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + private List<INCLifecycleTask> buildFailbackStartupSequence() { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + tasks.add(new StartFailbackTask()); + tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new StartLifecycleComponentsTask()); + return tasks; + } + + private List<INCLifecycleTask> buildStartupSequence(String nodeId) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + tasks.add(new StartReplicationServiceTask()); + final boolean isMetadataNode = nodeId.equals(currentMetadataNode); + if (isMetadataNode) { + tasks.add(new MetadataBootstrapTask()); + } + tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); + tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new CheckpointTask()); + tasks.add(new StartLifecycleComponentsTask()); + if (isMetadataNode) { + tasks.add(new BindMetadataNodeTask(true)); + } + return tasks; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java new file mode 100644 index 0000000..8d382a1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.event.schema.cluster.Cluster; + +public class FaultToleranceStrategyFactory { + + private static final Map<String, Class<? extends IFaultToleranceStrategy>> + BUILT_IN_FAULT_TOLERANCE_STRATEGY = new HashMap<>(); + + static { + BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", NoFaultToleranceStrategy.class); + BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node", MetadataNodeFaultToleranceStrategy.class); + BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", AutoFaultToleranceStrategy.class); + } + + private FaultToleranceStrategyFactory() { + throw new AssertionError(); + } + + public static IFaultToleranceStrategy create(Cluster cluster, IReplicationStrategy repStrategy, + ICCMessageBroker messageBroker) { + boolean highAvailabilityEnabled = cluster.getHighAvailability() != null + && cluster.getHighAvailability().getEnabled() != null + && Boolean.valueOf(cluster.getHighAvailability().getEnabled()); + + if (!highAvailabilityEnabled || cluster.getHighAvailability().getFaultTolerance() == null + || cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) { + return new NoFaultToleranceStrategy().from(repStrategy, messageBroker); + } + String strategyName = cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase(); + if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) { + throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s", + BUILT_IN_FAULT_TOLERANCE_STRATEGY.keySet().toString())); + } + Class<? extends IFaultToleranceStrategy> clazz = BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName); + try { + return clazz.newInstance().from(repStrategy, messageBroker); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } +} \ No newline at end of file
