http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index bc270df..ce57648 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -18,27 +18,17 @@ */ package org.apache.asterix.hyracks.bootstrap; -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.nc.NCAppRuntimeContext; +import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.config.AsterixExtension; -import org.apache.asterix.common.config.MetadataProperties; -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.IPropertiesProvider; import org.apache.asterix.common.config.MessagingProperties; -import org.apache.asterix.common.replication.IRemoteRecoveryManager; -import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.utils.PrintUtil; @@ -47,39 +37,39 @@ import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.messaging.MessagingChannelInterfaceFactory; import org.apache.asterix.messaging.NCMessageBroker; -import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.application.INCApplicationEntryPoint; import org.apache.hyracks.api.job.resource.NodeCapacity; -import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; -import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.nc.NodeControllerService; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + public class NCApplicationEntryPoint implements INCApplicationEntryPoint { private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName()); - @Option(name = "-initial-run", - usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false) + @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started " + + "(default: false)", required = false) public boolean initialRun = false; - @Option(name = "-virtual-NC", - usage = "A flag indicating if this NC is running on virtual cluster " + "(default: false)", - required = false) + @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster " + + "(default: false)", required = false) public boolean virtualNC = false; private INCApplicationContext ncApplicationContext = null; private IAppRuntimeContext runtimeContext; private String nodeId; - private boolean isMetadataNode = false; private boolean stopInitiated = false; - private SystemState systemState = SystemState.NEW_UNIVERSE; - private boolean pendingFailbackCompletion = false; + private SystemState systemState; private IMessageBroker messageBroker; @Override @@ -93,8 +83,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { parser.printUsage(System.err); throw e; } - ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(), - ncAppCtx.getLifeCycleComponentManager())); + ncAppCtx.setThreadFactory( + new AsterixThreadFactory(ncAppCtx.getThreadFactory(), ncAppCtx.getLifeCycleComponentManager())); ncApplicationContext = ncAppCtx; nodeId = ncApplicationContext.getNodeId(); if (LOGGER.isLoggable(Level.INFO)) { @@ -104,12 +94,11 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService(); if (System.getProperty("java.rmi.server.hostname") == null) { - System.setProperty("java.rmi.server.hostname", (controllerService) - .getConfiguration().clusterNetPublicIPAddress); + System.setProperty("java.rmi.server.hostname", + (controllerService).getConfiguration().clusterNetPublicIPAddress); } runtimeContext = new NCAppRuntimeContext(ncApplicationContext, getExtensions()); - MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext) - .getMetadataProperties(); + MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties(); if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Substitute node joining : " + ncApplicationContext.getNodeId()); @@ -118,69 +107,35 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { } runtimeContext.initialize(initialRun); ncApplicationContext.setApplicationObject(runtimeContext); - MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext) - .getMessagingProperties(); + MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext).getMessagingProperties(); messageBroker = new NCMessageBroker(controllerService, messagingProperties); ncApplicationContext.setMessageBroker(messageBroker); MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory( (NCMessageBroker) messageBroker, messagingProperties); ncApplicationContext.setMessagingChannelInterfaceFactory(interfaceFactory); - boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); - boolean autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled(); - if (initialRun) { - LOGGER.info("System is being initialized. (first run)"); - } else { - IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - systemState = recoveryMgr.getSystemState(); + IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); + systemState = recoveryMgr.getSystemState(); + if (systemState == SystemState.NEW_UNIVERSE) { if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("System is in a state: " + systemState); - } - - //do not attempt to perform remote recovery if this is a virtual NC - if (autoFailover && !virtualNC) { - if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) { - //Start failback process - IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager(); - remoteRecoveryMgr.startFailbackProcess(); - systemState = SystemState.RECOVERING; - pendingFailbackCompletion = true; - } - } else { - //recover if the system is corrupted by checking system state. - if (systemState == SystemState.CORRUPTED) { - recoveryMgr.startRecovery(true); - } + LOGGER.info("System state: " + SystemState.NEW_UNIVERSE); + LOGGER.info("Node ID: " + nodeId); + LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores())); + LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]); } + PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository(); + localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName()); } - /** - * if the node pending failback completion, the replication channel - * should not be opened to avoid other nodes connecting to it before - * the node completes its failback. CC will notify other replicas once - * this node is ready to receive replication requests. - */ - if (replicationEnabled && !pendingFailbackCompletion) { - startReplicationService(); - } + performLocalCleanUp(); } protected List<AsterixExtension> getExtensions() { return Collections.emptyList(); } - private void startReplicationService() throws InterruptedException { - //Open replication channel - runtimeContext.getReplicationChannel().start(); - - //Check the state of remote replicas - runtimeContext.getReplicationManager().initializeReplicasState(); - - //Start replication after the state of remote replicas has been initialized. - runtimeContext.getReplicationManager().startReplicationThreads(); - } - @Override public void stop() throws Exception { if (!stopInitiated) { @@ -204,63 +159,13 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { @Override public void notifyStartupComplete() throws Exception { - //Send max resource id on this NC to the CC - ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService()); - MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext) - .getMetadataProperties(); - if (initialRun || systemState == SystemState.NEW_UNIVERSE) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("System state: " + SystemState.NEW_UNIVERSE); - LOGGER.info("Node ID: " + nodeId); - LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores())); - LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]); - } - - PersistentLocalResourceRepository localResourceRepository = - (PersistentLocalResourceRepository) runtimeContext - .getLocalResourceRepository(); - localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName()); - } - - isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName()); - if (isMetadataNode && !pendingFailbackCompletion) { - runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE); + // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag + if (systemState == SystemState.NEW_UNIVERSE && (initialRun || virtualNC)) { + systemState = SystemState.INITIAL_RUN; } - ExternalLibraryUtils.setUpExternaLibraries(runtimeContext.getLibraryManager(), - isMetadataNode && !pendingFailbackCompletion); - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Starting lifecycle components"); - } - - Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>(); - String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY; - String dumpPath = metadataProperties.getCoredumpPath(nodeId); - lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Coredump directory for NC is: " + dumpPath); - } - ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager(); - lccm.configure(lifecycleMgmtConfiguration); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Configured:" + lccm); - } - ncApplicationContext.setStateDumpHandler( - new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm)); - - lccm.startAll(); - - if (!pendingFailbackCompletion) { - ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager(); - checkpointMgr.doSharpCheckpoint(); - - if (isMetadataNode) { - runtimeContext.exportMetadataNodeStub(); - } - } - - //Clean any temporary files - performLocalCleanUp(); + // Request startup tasks from CC + StartupTaskRequestMessage.send((NodeControllerService) ncApplicationContext.getControllerService(), + systemState); } @Override @@ -296,8 +201,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { } private void updateOnNodeJoin() { - MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext) - .getMetadataProperties(); + MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties(); if (!metadataProperties.getNodeNames().contains(nodeId)) { metadataProperties.getNodeNames().add(nodeId); Cluster cluster = ClusterProperties.INSTANCE.getCluster(); @@ -305,8 +209,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { throw new IllegalStateException("No cluster configuration found for this instance"); } String asterixInstanceName = metadataProperties.getInstanceName(); - TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext) - .getTransactionProperties(); + TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext).getTransactionProperties(); Node self = null; List<Node> nodes; if (cluster.getSubstituteNodes() != null) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java new file mode 100644 index 0000000..0a9a215 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.util; + +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.replication.Replica; +import org.apache.asterix.runtime.message.ReplicaEventMessage; +import org.apache.commons.lang3.StringUtils; +import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; + +public class FaultToleranceUtil { + + private static final Logger LOGGER = Logger.getLogger(FaultToleranceUtil.class.getName()); + private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address"; + + private FaultToleranceUtil() { + throw new AssertionError(); + } + + public static void notifyImpactedReplicas(String nodeId, ClusterEventType event, + IClusterStateManager clusterManager, ICCMessageBroker messageBroker, + IReplicationStrategy replicationStrategy) { + List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream() + .map(Replica::getId).collect(Collectors.toList()); + String nodeIdAddress = StringUtils.EMPTY; + Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration(); + // In case the node joined with a new IP address, we need to send it to the other replicas + if (event == ClusterEventType.NODE_JOIN) { + nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY); + } + ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); + for (String replica : primaryRemoteReplicas) { + // If the remote replica is alive, send the event + if (activeNcConfiguration.containsKey(replica)) { + try { + messageBroker.sendApplicationMessageToNC(msg, replica); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/resources/cluster.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml index 8f0b694..7f78b26 100644 --- a/asterixdb/asterix-app/src/main/resources/cluster.xml +++ b/asterixdb/asterix-app/src/main/resources/cluster.xml @@ -19,14 +19,22 @@ <cluster xmlns="cluster"> <instance_name>asterix</instance_name> <store>storage</store> + <metadata_node>nc1</metadata_node> - <data_replication> + <high_availability> <enabled>false</enabled> - <replication_port>2016</replication_port> - <replication_factor>2</replication_factor> - <auto_failover>false</auto_failover> - <replication_time_out>30</replication_time_out> - </data_replication> + <data_replication> + <strategy>metadata_only</strategy> + <replication_port>2016</replication_port> + <replication_time_out>30</replication_time_out> + </data_replication> + <fault_tolerance> + <strategy>metadata_node</strategy> + <replica> + <node_id>nc2</node_id> + </replica> + </fault_tolerance> + </high_availability> <master_node> <id>master</id> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm index 74e019b..a614faa 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm @@ -1,11 +1,9 @@ { "config" : { - "enabled" : false, - "factor" : 2, "log.batchsize" : 4096, "log.buffer.numpages" : 8, "log.buffer.pagesize" : 131072, "max.remote.recovery.attempts" : 5, "timeout" : 30 } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 04e30ff..ae19f23 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.IndexInfo; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager; @@ -102,4 +103,12 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd * @throws HyracksDataException */ void closeUserDatasets() throws HyracksDataException; + + /** + * Flushes all opened datasets that are matching {@code replicationStrategy}. + * + * @param replicationStrategy + * @throws HyracksDataException + */ + void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java new file mode 100644 index 0000000..c30e999 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.io.Serializable; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +@FunctionalInterface +public interface INCLifecycleTask extends Serializable { + + /** + * Performs the task. + * + * @param cs + * @throws HyracksDataException + */ + void perform(IControllerService cs) throws HyracksDataException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java new file mode 100644 index 0000000..d971f48 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +import java.util.Map; + +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IClusterStateManager { + + /** + * @return The current cluster state. + */ + ClusterState getState(); + + /** + * Updates the cluster state based on the state of all cluster partitions and the metadata node. + * Cluster state after refresh: + * ACTIVE: all cluster partitions are active and the metadata node is bound. + * PENDING: all cluster partitions are active but the metadata node is not bound. + * UNUSABLE: one or more cluster partitions are not active. + */ + void refreshState() throws HyracksDataException; + + /** + * Sets the cluster state into {@code state} + */ + void setState(ClusterState state); + + /** + * Updates all partitions of {@code nodeId} based on the {@code active} flag. + * @param nodeId + * @param active + * @throws HyracksDataException + */ + void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException; + + /** + * Updates the active node and active state of the cluster partition with id {@code partitionNum} + */ + void updateClusterPartition(Integer partitionNum, String activeNode, boolean active); + + /** + * Updates the metadata node id and its state. + */ + void updateMetadataNode(String nodeId, boolean active); + + /** + * @return a map of nodeId and NC Configuration for active nodes. + */ + Map<String, Map<String, String>> getActiveNcConfiguration(); + + /** + * @return The current metadata node Id. + */ + String getCurrentMetadataNodeId(); + + /** + * @param nodeId + * @return The node originally assigned partitions. + */ + ClusterPartition[] getNodePartitions(String nodeId); + + /** + * @return A copy of the current state of the cluster partitions. + */ + ClusterPartition[] getClusterPartitons(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java index 81c5a6d..980ad24 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java @@ -19,21 +19,27 @@ package org.apache.asterix.common.config; import java.io.InputStream; +import java.util.Optional; +import java.util.stream.Collectors; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.replication.ReplicationStrategyFactory; import org.apache.asterix.event.schema.cluster.Cluster; +import org.apache.asterix.event.schema.cluster.Node; +import org.apache.asterix.event.schema.cluster.Replica; +import org.apache.commons.lang3.StringUtils; public class ClusterProperties { public static final ClusterProperties INSTANCE = new ClusterProperties(); - private static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml"; private static final String DEFAULT_STORAGE_DIR_NAME = "storage"; - - private final Cluster cluster; + private String nodeNamePrefix = StringUtils.EMPTY; + private Cluster cluster; private ClusterProperties() { InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE); @@ -42,11 +48,11 @@ public class ClusterProperties { JAXBContext ctx = JAXBContext.newInstance(Cluster.class); Unmarshaller unmarshaller = ctx.createUnmarshaller(); cluster = (Cluster) unmarshaller.unmarshal(is); + nodeNamePrefix = cluster.getInstanceName() + "_"; + updateNodeIdToFullName(); } catch (JAXBException e) { throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e); } - } else { - cluster = null; } } @@ -62,14 +68,41 @@ public class ClusterProperties { return DEFAULT_STORAGE_DIR_NAME; } - public boolean isReplicationEnabled() { - if (cluster != null && cluster.getDataReplication() != null) { - return cluster.getDataReplication().isEnabled(); + public Node getNodeById(String nodeId) { + Optional<Node> matchingNode = cluster.getNode().stream().filter(node -> node.getId().equals(nodeId)).findAny(); + return matchingNode.isPresent() ? matchingNode.get() : null; + } + + public int getNodeIndex(String nodeId) { + for (int i = 0; i < cluster.getNode().size(); i++) { + Node node = cluster.getNode().get(i); + if (node.getId().equals(nodeId)) { + return i; + } + } + return -1; + } + + public IReplicationStrategy getReplicationStrategy() { + return ReplicationStrategyFactory.create(cluster); + } + + private String getNodeFullName(String nodeId) { + if (nodeId.startsWith(nodeNamePrefix)) { + return nodeId; } - return false; + return nodeNamePrefix + nodeId; } - public boolean isAutoFailoverEnabled() { - return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover(); + private void updateNodeIdToFullName() { + cluster.getNode().forEach(node -> node.setId(getNodeFullName(node.getId()))); + if (cluster.getMetadataNode() != null) { + cluster.setMetadataNode(getNodeFullName(cluster.getMetadataNode())); + } + if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getFaultTolerance() != null + && cluster.getHighAvailability().getFaultTolerance().getReplica() != null) { + Replica replicas = cluster.getHighAvailability().getFaultTolerance().getReplica(); + replicas.setNodeId(replicas.getNodeId().stream().map(this::getNodeFullName).collect(Collectors.toList())); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java index 164a525..cf2ce4f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java @@ -18,11 +18,10 @@ */ package org.apache.asterix.common.config; -import java.util.HashSet; import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.stream.Collectors; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.Replica; import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.asterix.event.schema.cluster.Node; @@ -31,16 +30,8 @@ import org.apache.hyracks.util.StorageUtil.StorageUnit; public class ReplicationProperties extends AbstractProperties { - private static final Logger LOGGER = Logger.getLogger(ReplicationProperties.class.getName()); - - private static final int REPLICATION_DATAPORT_DEFAULT = 2000; - private static final String REPLICATION_ENABLED_KEY = "replication.enabled"; - - private static final String REPLICATION_FACTOR_KEY = "replication.factor"; - private static final int REPLICATION_FACTOR_DEFAULT = 1; - private static final String REPLICATION_TIMEOUT_KEY = "replication.timeout"; private static final int REPLICATION_TIME_OUT_DEFAULT = 15; @@ -60,205 +51,60 @@ public class ReplicationProperties extends AbstractProperties { private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128, StorageUnit.KILOBYTE); - private final String nodeNamePrefix; private final Cluster cluster; + private final IReplicationStrategy repStrategy; public ReplicationProperties(PropertiesAccessor accessor) { super(accessor); this.cluster = ClusterProperties.INSTANCE.getCluster(); - - if (cluster != null) { - nodeNamePrefix = cluster.getInstanceName() + "_"; - } else { - nodeNamePrefix = ""; - } - } - - @PropertyKey(REPLICATION_ENABLED_KEY) - public boolean isReplicationEnabled() { - return ClusterProperties.INSTANCE.isReplicationEnabled(); + this.repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); } public String getReplicaIPAddress(String nodeId) { - if (cluster != null) { - for (int i = 0; i < cluster.getNode().size(); i++) { - Node node = cluster.getNode().get(i); - if (getRealCluserNodeID(node.getId()).equals(nodeId)) { - return node.getClusterIp(); - } - } - } - return NODE_IP_ADDRESS_DEFAULT; + Node node = ClusterProperties.INSTANCE.getNodeById(nodeId); + return node != null ? node.getClusterIp() : NODE_IP_ADDRESS_DEFAULT; } public int getDataReplicationPort(String nodeId) { - if (cluster != null && cluster.getDataReplication() != null) { - for (int i = 0; i < cluster.getNode().size(); i++) { - Node node = cluster.getNode().get(i); - if (getRealCluserNodeID(node.getId()).equals(nodeId)) { - return node.getReplicationPort() != null ? node.getReplicationPort().intValue() - : cluster.getDataReplication().getReplicationPort().intValue(); - } - } + Node node = ClusterProperties.INSTANCE.getNodeById(nodeId); + if (node != null) { + return node.getReplicationPort() != null ? node.getReplicationPort().intValue() + : cluster.getHighAvailability().getDataReplication().getReplicationPort().intValue(); } return REPLICATION_DATAPORT_DEFAULT; } - public Set<Replica> getRemoteReplicas(String nodeId) { - Set<Replica> remoteReplicas = new HashSet<>();; - - int numberOfRemoteReplicas = getReplicationFactor() - 1; - //Using chained-declustering - if (cluster != null) { - int nodeIndex = -1; - //find the node index in the cluster config - for (int i = 0; i < cluster.getNode().size(); i++) { - Node node = cluster.getNode().get(i); - if (getRealCluserNodeID(node.getId()).equals(nodeId)) { - nodeIndex = i; - break; - } - } - - if (nodeIndex == -1) { - LOGGER.log(Level.WARNING, - "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations"); - return null; - } - - //find nodes to the right of this node - for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) { - remoteReplicas.add(getReplicaByNodeIndex(i)); - if (remoteReplicas.size() == numberOfRemoteReplicas) { - break; - } - } - - //if not all remote replicas have been found, start from the beginning - if (remoteReplicas.size() != numberOfRemoteReplicas) { - for (int i = 0; i < cluster.getNode().size(); i++) { - remoteReplicas.add(getReplicaByNodeIndex(i)); - if (remoteReplicas.size() == numberOfRemoteReplicas) { - break; - } - } - } - } - return remoteReplicas; - } - - private Replica getReplicaByNodeIndex(int nodeIndex) { - Node node = cluster.getNode().get(nodeIndex); - Node replicaNode = new Node(); - replicaNode.setId(getRealCluserNodeID(node.getId())); - replicaNode.setClusterIp(node.getClusterIp()); - return new Replica(replicaNode); - } - public Replica getReplicaById(String nodeId) { - int nodeIndex = -1; - if (cluster != null) { - for (int i = 0; i < cluster.getNode().size(); i++) { - Node node = cluster.getNode().get(i); - - if (getRealCluserNodeID(node.getId()).equals(nodeId)) { - nodeIndex = i; - break; - } - } + Node node = ClusterProperties.INSTANCE.getNodeById(nodeId); + if (node != null) { + return new Replica(node); } - - if (nodeIndex < 0) { - return null; - } - - return getReplicaByNodeIndex(nodeIndex); + return null; } public Set<String> getRemoteReplicasIds(String nodeId) { - Set<String> remoteReplicasIds = new HashSet<>(); - Set<Replica> remoteReplicas = getRemoteReplicas(nodeId); - - for (Replica replica : remoteReplicas) { - remoteReplicasIds.add(replica.getId()); - } - - return remoteReplicasIds; + return repStrategy.getRemoteReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet()); } - public String getRealCluserNodeID(String nodeId) { - return nodeNamePrefix + nodeId; + public Set<String> getRemotePrimaryReplicasIds(String nodeId) { + return repStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet()); } public Set<String> getNodeReplicasIds(String nodeId) { - Set<String> replicaIds = new HashSet<>(); - replicaIds.add(nodeId); - replicaIds.addAll(getRemoteReplicasIds(nodeId)); - return replicaIds; - } - - @PropertyKey(REPLICATION_FACTOR_KEY) - public int getReplicationFactor() { - if (cluster != null) { - if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) { - return REPLICATION_FACTOR_DEFAULT; - } - return cluster.getDataReplication().getReplicationFactor().intValue(); - } - return REPLICATION_FACTOR_DEFAULT; + Set<String> remoteReplicasIds = getRemoteReplicasIds(nodeId); + // This includes the node itself + remoteReplicasIds.add(nodeId); + return remoteReplicasIds; } @PropertyKey(REPLICATION_TIMEOUT_KEY) public int getReplicationTimeOut() { if (cluster != null) { - return cluster.getDataReplication().getReplicationTimeOut().intValue(); + return cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue(); } return REPLICATION_TIME_OUT_DEFAULT; } - /** - * @param nodeId - * @return The set of nodes which replicate to this node, including the node itself - */ - public Set<String> getNodeReplicationClients(String nodeId) { - Set<String> clientReplicas = new HashSet<>(); - clientReplicas.add(nodeId); - - int clientsCount = getReplicationFactor(); - - //Using chained-declustering backwards - if (cluster != null) { - int nodeIndex = -1; - //find the node index in the cluster config - for (int i = 0; i < cluster.getNode().size(); i++) { - Node node = cluster.getNode().get(i); - if (getRealCluserNodeID(node.getId()).equals(nodeId)) { - nodeIndex = i; - break; - } - } - - //find nodes to the left of this node - for (int i = nodeIndex - 1; i >= 0; i--) { - clientReplicas.add(getReplicaByNodeIndex(i).getId()); - if (clientReplicas.size() == clientsCount) { - break; - } - } - - //if not all client replicas have been found, start from the end - if (clientReplicas.size() != clientsCount) { - for (int i = cluster.getNode().size() - 1; i >= 0; i--) { - clientReplicas.add(getReplicaByNodeIndex(i).getId()); - if (clientReplicas.size() == clientsCount) { - break; - } - } - } - } - return clientReplicas; - } - @PropertyKey(REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY) public int getMaxRemoteRecoveryAttempts() { return MAX_REMOTE_RECOVERY_ATTEMPTS; @@ -281,4 +127,12 @@ public class ReplicationProperties extends AbstractProperties { return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT, PropertyInterpreters.getIntegerBytePropertyInterpreter()); } -} + + public boolean isParticipant(String nodeId) { + return repStrategy.isParticipant(nodeId); + } + + public IReplicationStrategy getReplicationStrategy() { + return repStrategy; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index f49b07a..cbb4868 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.Resource; @@ -571,4 +572,13 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public int getNumPartitions() { return numPartitions; } + + @Override + public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException { + for (DatasetResource dsr : datasets.values()) { + if (replicationStrategy.isMatch(dsr.getDatasetID())) { + flushDatasetOpenIndexes(dsr.getDatasetInfo(), false); + } + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java new file mode 100644 index 0000000..ad326b2 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.event.schema.cluster.Cluster; + +public class ChainedDeclusteringReplicationStrategy implements IReplicationStrategy { + + private static final Logger LOGGER = Logger.getLogger(ChainedDeclusteringReplicationStrategy.class.getName()); + private int replicationFactor; + + @Override + public boolean isMatch(int datasetId) { + return true; + } + + @Override + public Set<Replica> getRemoteReplicas(String nodeId) { + Set<Replica> remoteReplicas = new HashSet<>(); + Cluster cluster = ClusterProperties.INSTANCE.getCluster(); + int numberOfRemoteReplicas = replicationFactor - 1; + int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId); + + if (nodeIndex == -1) { + LOGGER.log(Level.WARNING, "Could not find node " + nodeId + " in cluster configurations"); + return Collections.emptySet(); + } + + //find nodes to the right of this node + while (remoteReplicas.size() != numberOfRemoteReplicas) { + remoteReplicas.add(new Replica(cluster.getNode().get(++nodeIndex % cluster.getNode().size()))); + } + + return remoteReplicas; + } + + @Override + public Set<Replica> getRemotePrimaryReplicas(String nodeId) { + Set<Replica> clientReplicas = new HashSet<>(); + Cluster cluster = ClusterProperties.INSTANCE.getCluster(); + final int remotePrimaryReplicasCount = replicationFactor - 1; + + int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId); + + //find nodes to the left of this node + while (clientReplicas.size() != remotePrimaryReplicasCount) { + clientReplicas.add(new Replica(cluster.getNode().get(Math.abs(--nodeIndex % cluster.getNode().size())))); + } + + return clientReplicas; + } + + @Override + public ChainedDeclusteringReplicationStrategy from(Cluster cluster) { + if (cluster.getHighAvailability().getDataReplication().getReplicationFactor() == null) { + throw new IllegalStateException("Replication factor must be specified."); + } + ChainedDeclusteringReplicationStrategy cd = new ChainedDeclusteringReplicationStrategy(); + cd.replicationFactor = cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue(); + return cd; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java new file mode 100644 index 0000000..46d5d98 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IFaultToleranceStrategy { + + /** + * Defines the logic of a {@link IFaultToleranceStrategy} when a node joins the cluster. + * + * @param nodeId + * @throws HyracksDataException + */ + void notifyNodeJoin(String nodeId) throws HyracksDataException; + + /** + * Defines the logic of a {@link IFaultToleranceStrategy} when a node leaves the cluster. + * + * @param nodeId + * @throws HyracksDataException + */ + void notifyNodeFailure(String nodeId) throws HyracksDataException; + + /** + * Binds the fault tolerance strategy to {@code cluserManager}. + * + * @param clusterManager + */ + void bindTo(IClusterStateManager clusterManager); + + /** + * Processes {@code message} based on the message type. + * + * @param message + * @throws HyracksDataException + */ + void process(INCLifecycleMessage message) throws HyracksDataException; + + /** + * Constructs a fault tolerance strategy. + * + * @param replicationStrategy + * @param messageBroker + * @return + */ + IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java new file mode 100644 index 0000000..c19b0aa --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import org.apache.asterix.common.messaging.api.IApplicationMessage; + +public interface INCLifecycleMessage extends IApplicationMessage { + + public enum MessageType { + REPLAY_LOGS_REQUEST, + REPLAY_LOGS_RESPONSE, + PREPARE_FAILBACK_REQUEST, + PREPARE_FAILBACK_RESPONSE, + COMPLETE_FAILBACK_REQUEST, + COMPLETE_FAILBACK_RESPONSE, + STARTUP_TASK_REQUEST, + STARTUP_TASK_RESPONSE, + STARTUP_TASK_RESULT, + TAKEOVER_PARTITION_REQUEST, + TAKEOVER_PARTITION_RESPONSE, + TAKEOVER_METADATA_NODE_REQUEST, + TAKEOVER_METADATA_NODE_RESPONSE + } + + /** + * @return The message type. + */ + MessageType getType(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java index 9f9d74b..51b826b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java @@ -19,8 +19,11 @@ package org.apache.asterix.common.replication; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IRemoteRecoveryManager { @@ -46,4 +49,23 @@ public interface IRemoteRecoveryManager { * @throws InterruptedException */ public void completeFailbackProcess() throws IOException, InterruptedException; + + /** + * Replays all committed jobs logs for {@code partitions}. Optionally, flushes all datasets + * to convert the replayed logs into LSM Components. + * + * @param partitions + * @param flush + * @throws HyracksDataException + */ + void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException; + + /** + * Performs the remote recovery plan by requesting data from each specified node + * for each partitions specified. + * + * @param recoveryPlan + * @throws HyracksDataException + */ + void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java index 6bd1505..b969bef 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java @@ -50,13 +50,13 @@ public interface IReplicationManager extends IIOReplicationManager { * * @param remoteReplicaId * The replica id to send the request to. - * @param replicasDataToRecover - * Get files that belong to those replicas. + * @param partitionsToRecover + * Get files that belong to those partitions. * @param existingFiles * a list of already existing files on the requester * @throws IOException */ - public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover, + public void requestReplicaFiles(String remoteReplicaId, Set<Integer> partitionsToRecover, Set<String> existingFiles) throws IOException; /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java new file mode 100644 index 0000000..f65f6ac --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import java.util.Set; + +import org.apache.asterix.event.schema.cluster.Cluster; + +public interface IReplicationStrategy { + + /** + * @param datasetId + * @return True, if the dataset should be replicated. Otherwise false. + */ + boolean isMatch(int datasetId); + + /** + * @param nodeId + * @return The set of nodes that replicate data on {@code nodeId}. + */ + Set<Replica> getRemotePrimaryReplicas(String nodeId); + + /** + * @param node + * @return The set of nodes that {@code nodeId} replicates data to. + */ + Set<Replica> getRemoteReplicas(String node); + + /** + * @param nodeId + * @return true if {@code nodeId} has any remote primary replica or remote replica. Otherwise false. + */ + default boolean isParticipant(String nodeId) { + return !getRemoteReplicas(nodeId).isEmpty() || !getRemotePrimaryReplicas(nodeId).isEmpty(); + } + + /** + * @param cluster + * @return A replication strategy based on the passed configurations. + */ + IReplicationStrategy from(Cluster cluster); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java new file mode 100644 index 0000000..711f06d --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.event.schema.cluster.Cluster; +import org.apache.asterix.event.schema.cluster.Node; + +public class MetadataOnlyReplicationStrategy implements IReplicationStrategy { + + private String metadataNodeId; + private Replica metadataPrimaryReplica; + private Set<Replica> metadataNodeReplicas; + + @Override + public boolean isMatch(int datasetId) { + return datasetId < MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID && datasetId >= 0; + } + + @Override + public Set<Replica> getRemoteReplicas(String nodeId) { + if (nodeId.equals(metadataNodeId)) { + return metadataNodeReplicas; + } + return Collections.emptySet(); + } + + @Override + public Set<Replica> getRemotePrimaryReplicas(String nodeId) { + if (metadataNodeReplicas.stream().map(Replica::getId).filter(replicaId -> replicaId.equals(nodeId)) + .count() != 0) { + return new HashSet<>(Arrays.asList(metadataPrimaryReplica)); + } + return Collections.emptySet(); + } + + @Override + public MetadataOnlyReplicationStrategy from(Cluster cluster) { + if (cluster.getMetadataNode() == null) { + throw new IllegalStateException("Metadata node must be specified."); + } + + Node metadataNode = ClusterProperties.INSTANCE.getNodeById(cluster.getMetadataNode()); + if (metadataNode == null) { + throw new IllegalStateException("Invalid metadata node specified"); + } + + if (cluster.getHighAvailability().getFaultTolerance().getReplica() == null + || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId() == null + || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId().isEmpty()) { + throw new IllegalStateException("One or more replicas must be specified for metadata node."); + } + + final Set<Replica> replicas = new HashSet<>(); + for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) { + Node node = ClusterProperties.INSTANCE.getNodeById(nodeId); + if (node == null) { + throw new IllegalStateException("Invalid replica specified: " + nodeId); + } + replicas.add(new Replica(node)); + } + MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy(); + st.metadataNodeId = cluster.getMetadataNode(); + st.metadataPrimaryReplica = new Replica(metadataNode); + st.metadataNodeReplicas = replicas; + return st; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java new file mode 100644 index 0000000..43347f6 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import java.util.Collections; +import java.util.Set; + +import org.apache.asterix.event.schema.cluster.Cluster; + +public class NoReplicationStrategy implements IReplicationStrategy { + + @Override + public boolean isMatch(int datasetId) { + return false; + } + + @Override + public boolean isParticipant(String nodeId) { + return false; + } + + @Override + public Set<Replica> getRemotePrimaryReplicas(String nodeId) { + return Collections.emptySet(); + } + + @Override + public Set<Replica> getRemoteReplicas(String node) { + return Collections.emptySet(); + } + + @Override + public NoReplicationStrategy from(Cluster cluster) { + return new NoReplicationStrategy(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java index bd77778..267a22d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java @@ -36,11 +36,13 @@ public class Replica { UNKNOWN } - final Node node; + private final Node node; private ReplicaState state = ReplicaState.UNKNOWN; public Replica(Node node) { - this.node = node; + this.node = new Node(); + this.node.setId(node.getId()); + this.node.setClusterIp(node.getClusterIp()); } public ReplicaState getState() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java new file mode 100644 index 0000000..b61b38a --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.asterix.event.schema.cluster.Cluster; + +public class ReplicationStrategyFactory { + + private static final Map<String, Class<? extends IReplicationStrategy>> + BUILT_IN_REPLICATION_STRATEGY = new HashMap<>(); + + static { + BUILT_IN_REPLICATION_STRATEGY.put("no_replication", NoReplicationStrategy.class); + BUILT_IN_REPLICATION_STRATEGY.put("chained_declustering", ChainedDeclusteringReplicationStrategy.class); + BUILT_IN_REPLICATION_STRATEGY.put("metadata_only", MetadataOnlyReplicationStrategy.class); + } + + private ReplicationStrategyFactory() { + throw new AssertionError(); + } + + public static IReplicationStrategy create(Cluster cluster) { + boolean highAvailabilityEnabled = cluster.getHighAvailability() != null + && cluster.getHighAvailability().getEnabled() != null + && Boolean.valueOf(cluster.getHighAvailability().getEnabled()); + + if (!highAvailabilityEnabled || cluster.getHighAvailability().getDataReplication() == null + || cluster.getHighAvailability().getDataReplication().getStrategy() == null) { + return new NoReplicationStrategy(); + } + String strategyName = cluster.getHighAvailability().getDataReplication().getStrategy().toLowerCase(); + if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) { + throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s", + BUILT_IN_REPLICATION_STRATEGY.keySet().toString())); + } + Class<? extends IReplicationStrategy> clazz = BUILT_IN_REPLICATION_STRATEGY.get(strategyName); + try { + return clazz.newInstance().from(cluster); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java new file mode 100644 index 0000000..ca6968f --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import java.io.File; + +import org.apache.asterix.common.utils.StoragePathUtil; + +/** + * A holder class for an index file properties. + */ +public class IndexFileProperties { + + private final String fileName; + private final String idxName; + private final String dataverseName; + private final int partitionId; + private final int datasetId; + + public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) { + this.partitionId = partitionId; + this.dataverseName = dataverseName; + this.idxName = idxName; + this.fileName = fileName; + this.datasetId = datasetId; + } + + public String getFileName() { + return fileName; + } + + public String getIdxName() { + return idxName; + } + + public String getDataverseName() { + return dataverseName; + } + + public int getPartitionId() { + return partitionId; + } + + public int getDatasetId() { + return datasetId; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator); + sb.append(dataverseName + File.separator); + sb.append(idxName + File.separator); + sb.append(fileName); + sb.append(" [Dataset ID: " + datasetId + "]"); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index 6816116..3e85276 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -34,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IRecoveryManager { public enum SystemState { + INITIAL_RUN, NEW_UNIVERSE, RECOVERING, HEALTHY, @@ -120,4 +121,6 @@ public interface IRecoveryManager { * Deletes all temporary recovery files */ public void deleteRecoveryTemporaryFiles(); + + void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 9f2e3e7..46cd476 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -78,6 +78,17 @@ public class StoragePathUtil { } /** + * @param fileAbsolutePath + * @return the file relative path starting from the partition directory + */ + public static String getIndexFileRelativePath(String fileAbsolutePath) { + String[] tokens = fileAbsolutePath.split(File.separator); + //partition/dataverse/idx/fileName + return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator + + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1]; + } + + /** * Create a file * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when * creating files simultaneously http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd index 79c377a..098b4e7 100644 --- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd +++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd @@ -44,7 +44,7 @@ <xs:element name="http_port" type="xs:integer" /> <xs:element name="debug_port" type="xs:integer" /> <xs:element name="metadata_node" type="xs:string" /> - <xs:element name="enabled" type="xs:boolean" /> + <xs:element name="enabled" type="xs:string" /> <xs:element name="replication_port" type="xs:integer" /> <xs:element name="replication_factor" type="xs:integer" /> <xs:element name="auto_failover" type="xs:boolean" /> @@ -57,6 +57,8 @@ <xs:element name="result_time_to_live" type="xs:long" /> <xs:element name="result_sweep_threshold" type="xs:long" /> <xs:element name="cc_root" type="xs:string" /> + <xs:element name="strategy" type="xs:string" /> + <xs:element name="node_id" type="xs:string" /> <!-- definition of complex elements --> <xs:element name="working_dir"> @@ -87,15 +89,33 @@ <xs:element name="data_replication"> <xs:complexType> <xs:sequence> - <xs:element ref="cl:enabled" /> + <xs:element ref="cl:strategy" /> <xs:element ref="cl:replication_port" /> <xs:element ref="cl:replication_factor" /> - <xs:element ref="cl:auto_failover" /> <xs:element ref="cl:replication_time_out" /> </xs:sequence> </xs:complexType> </xs:element> + <xs:element name="fault_tolerance"> + <xs:complexType> + <xs:sequence> + <xs:element ref="cl:strategy" /> + <xs:element ref="cl:replica" minOccurs="0"/> + </xs:sequence> + </xs:complexType> + </xs:element> + + <xs:element name="high_availability"> + <xs:complexType> + <xs:sequence> + <xs:element ref="cl:enabled" minOccurs="0"/> + <xs:element ref="cl:data_replication" minOccurs="0"/> + <xs:element ref="cl:fault_tolerance" minOccurs="0"/> + </xs:sequence> + </xs:complexType> + </xs:element> + <xs:element name="property"> <xs:complexType> <xs:sequence> @@ -136,6 +156,14 @@ </xs:complexType> </xs:element> + <xs:element name="replica"> + <xs:complexType> + <xs:sequence> + <xs:element ref="cl:node_id" maxOccurs="unbounded" /> + </xs:sequence> + </xs:complexType> + </xs:element> + <xs:element name="cluster"> <xs:complexType> <xs:sequence> @@ -150,7 +178,7 @@ <xs:element ref="cl:iodevices" minOccurs="0" /> <xs:element ref="cl:working_dir" /> <xs:element ref="cl:metadata_node" /> - <xs:element ref="cl:data_replication" minOccurs="0" /> + <xs:element ref="cl:high_availability" minOccurs="0" /> <xs:element ref="cl:master_node" /> <xs:element ref="cl:node" maxOccurs="unbounded" /> <xs:element ref="cl:substitute_nodes" /> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java index 288a739..8213213 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java @@ -76,7 +76,6 @@ import org.apache.http.impl.client.StandardHttpRequestRetryHandler; import org.apache.http.util.EntityUtils; import org.apache.hyracks.util.StorageUtil; import org.junit.Assert; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonMappingException; @@ -1108,6 +1107,21 @@ public class TestExecutor { } ProcessBuilder pb = new ProcessBuilder("kill", "-9", Integer.toString(nodePid)); pb.start().waitFor(); + // Delete NC's transaction logs to re-initialize it as a new NC. + deleteNCTxnLogs(nodeId, cUnit); + } + + private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception { + OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); + String endpoint = "/admin/cluster"; + InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" + host + ":" + port + endpoint)); + StringWriter actual = new StringWriter(); + IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8); + String config = actual.toString(); + ObjectMapper om = new ObjectMapper(); + String logDir = om.readTree(config).findPath("transaction.log.dirs").get(nodeId).asText(); + ProcessBuilder pb = new ProcessBuilder("rm", "-rf", logDir); + pb.start().waitFor(); } public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java index 6d2b4b9..eac7586 100644 --- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java +++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java @@ -44,7 +44,8 @@ public class ConfigureCommand extends AbstractCommand { @Override protected void execCommand() throws Exception { configureCluster("local", "local.xml"); - configureCluster("local", "local_with_replication.xml"); + configureCluster("local", "local_chained_declustering_rep.xml"); + configureCluster("local", "local_metadata_only_rep.xml"); configureCluster("demo", "demo.xml"); String installerConfPath = InstallerDriver.getManagixHome() + File.separator + InstallerDriver.MANAGIX_CONF_XML; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java index 4037eaf..748d811 100644 --- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java +++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java @@ -262,40 +262,16 @@ public class ValidateCommand extends AbstractCommand { boolean valid = true; //if replication is disabled, no need to validate the settings - if (cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) { - - if (cluster.getDataReplication().getReplicationFactor() == null) { - if (cluster.getNode().size() >= 3) { - LOGGER.warn("Replication factor not defined. Using default value (3) " + WARNING); - - } else { - valid = false; - LOGGER.fatal("Replication factor not defined for data repliaction. " + ERROR); - } - - } - - //replication factor = 1 means no replication - if (cluster.getDataReplication().getReplicationFactor().intValue() == 1) { - LOGGER.warn("Replication factor is set to 1. Disabling data replication" + WARNING); - return true; - } - - if (cluster.getDataReplication().getReplicationFactor().intValue() > cluster.getNode().size()) { - LOGGER.fatal("Replication factor = " + cluster.getDataReplication().getReplicationFactor().intValue() - + " requires at least " + cluster.getDataReplication().getReplicationFactor().intValue() - + " nodes in the cluster" + ERROR); - valid = false; - } - - if (cluster.getDataReplication().getReplicationPort() == null - || cluster.getDataReplication().getReplicationPort().toString().length() == 0) { + if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getDataReplication() != null) { + if (cluster.getHighAvailability().getDataReplication().getReplicationPort() == null || cluster + .getHighAvailability().getDataReplication().getReplicationPort().toString().length() == 0) { valid = false; LOGGER.fatal("Replication data port not defined for data repliaction. " + ERROR); } - if (cluster.getDataReplication().getReplicationTimeOut() == null - || (cluster.getDataReplication().getReplicationTimeOut().intValue() + "").length() == 0) { + if (cluster.getHighAvailability().getDataReplication().getReplicationTimeOut() == null || String + .valueOf(cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue()) + .length() == 0) { LOGGER.warn("Replication maximum wait time not defined. Using default value (60 seconds) " + WARNING); }
