http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java new file mode 100644 index 0000000..e6638e8 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import org.apache.asterix.app.nc.task.BindMetadataNodeTask; +import org.apache.asterix.app.nc.task.CheckpointTask; +import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; +import org.apache.asterix.app.nc.task.LocalRecoveryTask; +import org.apache.asterix.app.nc.task.MetadataBootstrapTask; +import org.apache.asterix.app.nc.task.RemoteRecoveryTask; +import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; +import org.apache.asterix.app.nc.task.StartReplicationServiceTask; +import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage; +import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage; +import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; +import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.replication.Replica; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.util.FaultToleranceUtil; +import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrategy { + + private static final Logger LOGGER = Logger.getLogger(MetadataNodeFaultToleranceStrategy.class.getName()); + private IClusterStateManager clusterManager; + private String metadataNodeId; + private IReplicationStrategy replicationStrategy; + private ICCMessageBroker messageBroker; + private final Set<String> hotStandbyMetadataReplica = new HashSet<>(); + private final Set<String> failedNodes = new HashSet<>(); + private Set<String> pendingStartupCompletionNodes = new HashSet<>(); + + @Override + public synchronized void notifyNodeJoin(String nodeId) throws HyracksDataException { + pendingStartupCompletionNodes.add(nodeId); + } + + @Override + public synchronized void notifyNodeFailure(String nodeId) throws HyracksDataException { + failedNodes.add(nodeId); + hotStandbyMetadataReplica.remove(nodeId); + clusterManager.updateNodePartitions(nodeId, false); + if (nodeId.equals(metadataNodeId)) { + clusterManager.updateMetadataNode(metadataNodeId, false); + } + clusterManager.refreshState(); + if (replicationStrategy.isParticipant(nodeId)) { + // Notify impacted replica + FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE, clusterManager, + messageBroker, replicationStrategy); + } + // If the failed node is the metadata node, ask its replicas to replay any committed jobs + if (nodeId.equals(metadataNodeId)) { + int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition() + .getPartitionId(); + Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId)); + Set<Replica> activeRemoteReplicas = replicationStrategy.getRemoteReplicas(metadataNodeId).stream() + .filter(replica -> !failedNodes.contains(replica.getId())).collect(Collectors.toSet()); + //TODO Do election to identity the node with latest state + for (Replica replica : activeRemoteReplicas) { + ReplayPartitionLogsRequestMessage msg = new ReplayPartitionLogsRequestMessage(metadataPartition); + try { + messageBroker.sendApplicationMessageToNC(msg, replica.getId()); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e); + continue; + } + } + } + } + + @Override + public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) { + MetadataNodeFaultToleranceStrategy ft = new MetadataNodeFaultToleranceStrategy(); + ft.replicationStrategy = replicationStrategy; + ft.messageBroker = messageBroker; + return ft; + } + + @Override + public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { + switch (message.getType()) { + case STARTUP_TASK_REQUEST: + process((StartupTaskRequestMessage) message); + break; + case STARTUP_TASK_RESULT: + process((NCLifecycleTaskReportMessage) message); + break; + case REPLAY_LOGS_RESPONSE: + process((ReplayPartitionLogsResponseMessage) message); + break; + default: + throw new HyracksDataException("Unsupported message type: " + message.getType().name()); + } + } + + @Override + public synchronized void bindTo(IClusterStateManager clusterManager) { + this.clusterManager = clusterManager; + this.metadataNodeId = clusterManager.getCurrentMetadataNodeId(); + } + + private synchronized void process(ReplayPartitionLogsResponseMessage msg) { + hotStandbyMetadataReplica.add(msg.getNodeId()); + LOGGER.log(Level.INFO, "Hot Standby Metadata Replicas: " + hotStandbyMetadataReplica); + } + + private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException { + final String nodeId = msg.getNodeId(); + final SystemState state = msg.getState(); + final boolean isParticipant = replicationStrategy.isParticipant(nodeId); + List<INCLifecycleTask> tasks; + if (!isParticipant) { + tasks = buildNonParticipantStartupSequence(nodeId, state); + } else { + tasks = buildParticipantStartupSequence(nodeId, state); + } + StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + try { + messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + private synchronized void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { + final String nodeId = msg.getNodeId(); + pendingStartupCompletionNodes.remove(nodeId); + if (msg.isSuccess()) { + // If this node failed and recovered, notify impacted replicas to reconnect to it + if (replicationStrategy.isParticipant(nodeId) && failedNodes.remove(nodeId)) { + FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, + messageBroker, replicationStrategy); + } + clusterManager.updateNodePartitions(msg.getNodeId(), true); + if (msg.getNodeId().equals(metadataNodeId)) { + clusterManager.updateMetadataNode(metadataNodeId, true); + // When metadata node is active, it is the only hot standby replica + hotStandbyMetadataReplica.clear(); + hotStandbyMetadataReplica.add(metadataNodeId); + } + clusterManager.refreshState(); + } else { + LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException()); + } + } + + private List<INCLifecycleTask> buildNonParticipantStartupSequence(String nodeId, SystemState state) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + if (state == SystemState.CORRUPTED) { + //need to perform local recovery for node partitions + LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId)) + .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); + tasks.add(rt); + } + tasks.add(new ExternalLibrarySetupTask(false)); + tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new CheckpointTask()); + tasks.add(new StartLifecycleComponentsTask()); + return tasks; + } + + private List<INCLifecycleTask> buildParticipantStartupSequence(String nodeId, SystemState state) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + switch (state) { + case NEW_UNIVERSE: + // If the metadata node (or replica) failed and lost its data + // => Metadata Remote Recovery from standby replica + tasks.add(getMetadataPartitionRecoveryPlan()); + // Checkpoint after remote recovery to move node to HEALTHY state + tasks.add(new CheckpointTask()); + break; + case CORRUPTED: + // If the metadata node (or replica) failed and started again without losing data => Local Recovery + LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId)) + .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); + tasks.add(rt); + break; + case INITIAL_RUN: + case HEALTHY: + case RECOVERING: + break; + default: + break; + } + tasks.add(new StartReplicationServiceTask()); + final boolean isMetadataNode = nodeId.equals(metadataNodeId); + if (isMetadataNode) { + tasks.add(new MetadataBootstrapTask()); + } + tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); + tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new CheckpointTask()); + tasks.add(new StartLifecycleComponentsTask()); + if (isMetadataNode) { + tasks.add(new BindMetadataNodeTask(true)); + } + return tasks; + } + + private RemoteRecoveryTask getMetadataPartitionRecoveryPlan() { + if (hotStandbyMetadataReplica.isEmpty()) { + throw new IllegalStateException("No metadata replicas to recover from"); + } + // Construct recovery plan: Node => Set of partitions to recover from it + Map<String, Set<Integer>> recoveryPlan = new HashMap<>(); + // Recover metadata partition from any metadata hot standby replica + int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition() + .getPartitionId(); + Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId)); + recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), metadataPartition); + return new RemoteRecoveryTask(recoveryPlan); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java new file mode 100644 index 0000000..0ee4f6a --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import org.apache.asterix.app.nc.task.BindMetadataNodeTask; +import org.apache.asterix.app.nc.task.CheckpointTask; +import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; +import org.apache.asterix.app.nc.task.LocalRecoveryTask; +import org.apache.asterix.app.nc.task.MetadataBootstrapTask; +import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; +import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; +import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { + + private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName()); + IClusterStateManager clusterManager; + private String metadataNodeId; + private ICCMessageBroker messageBroker; + private Set<String> pendingStartupCompletionNodes = new HashSet<>(); + + @Override + public void notifyNodeJoin(String nodeId) throws HyracksDataException { + pendingStartupCompletionNodes.add(nodeId); + } + + @Override + public void notifyNodeFailure(String nodeId) throws HyracksDataException { + pendingStartupCompletionNodes.remove(nodeId); + clusterManager.updateNodePartitions(nodeId, false); + if (nodeId.equals(metadataNodeId)) { + clusterManager.updateMetadataNode(metadataNodeId, false); + } + clusterManager.refreshState(); + } + + @Override + public void process(INCLifecycleMessage message) throws HyracksDataException { + switch (message.getType()) { + case STARTUP_TASK_REQUEST: + process((StartupTaskRequestMessage) message); + break; + case STARTUP_TASK_RESULT: + process((NCLifecycleTaskReportMessage) message); + break; + default: + throw new HyracksDataException("Unsupported message type: " + message.getType().name()); + } + } + + @Override + public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) { + NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy(); + ft.messageBroker = messageBroker; + return ft; + } + + @Override + public void bindTo(IClusterStateManager clusterManager) { + this.clusterManager = clusterManager; + metadataNodeId = clusterManager.getCurrentMetadataNodeId(); + } + + private void process(StartupTaskRequestMessage msg) throws HyracksDataException { + final String nodeId = msg.getNodeId(); + List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState()); + StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + try { + messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { + pendingStartupCompletionNodes.remove(msg.getNodeId()); + if (msg.isSuccess()) { + clusterManager.updateNodePartitions(msg.getNodeId(), true); + if (msg.getNodeId().equals(metadataNodeId)) { + clusterManager.updateMetadataNode(metadataNodeId, true); + } + clusterManager.refreshState(); + } else { + LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException()); + } + } + + private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + if (state == SystemState.CORRUPTED) { + //need to perform local recovery for node partitions + LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId)) + .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); + tasks.add(rt); + } + final boolean isMetadataNode = nodeId.equals(metadataNodeId); + if (isMetadataNode) { + tasks.add(new MetadataBootstrapTask()); + } + tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); + tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new CheckpointTask()); + tasks.add(new StartLifecycleComponentsTask()); + if (isMetadataNode) { + tasks.add(new BindMetadataNodeTask(true)); + } + return tasks; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java new file mode 100644 index 0000000..1abc3f0 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage; +import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage; + +public class NodeFailbackPlan { + + public enum FailbackPlanState { + /** + * Initial state while selecting the nodes that will participate + * in the node failback plan. + */ + PREPARING, + /** + * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added, + * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate + * a response is expected and need to wait for it. + */ + PENDING_PARTICIPANT_REPONSE, + /** + * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response, + * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate + * the need to send {@link CompleteFailbackRequestMessage} to the failing back node. + */ + PENDING_COMPLETION, + /** + * if any of the participants fail or the failing back node itself fails during + * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION), + * the state is changed to FAILED. + */ + FAILED, + /** + * if the state is FAILED, and all pending responses (if any) have been received, + * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert + * the effects of this plan (if any). + */ + PENDING_ROLLBACK + } + + private static long planIdGenerator = 0; + private long planId; + private final String nodeId; + private final Set<String> participants; + private final Map<Integer, String> partition2nodeMap; + private String nodeToReleaseMetadataManager; + private int requestId; + private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests; + private FailbackPlanState state; + + public static NodeFailbackPlan createPlan(String nodeId) { + return new NodeFailbackPlan(planIdGenerator++, nodeId); + } + + private NodeFailbackPlan(long planId, String nodeId) { + this.planId = planId; + this.nodeId = nodeId; + participants = new HashSet<>(); + partition2nodeMap = new HashMap<>(); + pendingRequests = new HashMap<>(); + state = FailbackPlanState.PREPARING; + } + + public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) { + partition2nodeMap.put(partitionId, currentActiveNode); + } + + public synchronized void addParticipant(String nodeId) { + participants.add(nodeId); + } + + public synchronized void notifyNodeFailure(String failedNode) { + if (participants.contains(failedNode)) { + if (state == FailbackPlanState.PREPARING) { + state = FailbackPlanState.FAILED; + } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) { + /** + * if there is any pending request from this failed node, + * it should be marked as completed and the plan should be marked as failed + */ + Set<Integer> failedRequests = new HashSet<>(); + for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) { + if (request.getNodeID().equals(failedNode)) { + failedRequests.add(request.getRequestId()); + } + } + + if (!failedRequests.isEmpty()) { + state = FailbackPlanState.FAILED; + for (Integer failedRequestId : failedRequests) { + markRequestCompleted(failedRequestId); + } + } + } + } else if (nodeId.equals(failedNode)) { + //if the failing back node is the failed node itself + state = FailbackPlanState.FAILED; + updateState(); + } + } + + public synchronized Set<Integer> getPartitionsToFailback() { + return new HashSet<>(partition2nodeMap.keySet()); + } + + public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) { + //if this is the first request + if (pendingRequests.size() == 0) { + state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE; + } + pendingRequests.put(msg.getRequestId(), msg); + } + + public synchronized void markRequestCompleted(int requestId) { + pendingRequests.remove(requestId); + updateState(); + } + + private void updateState() { + if (pendingRequests.size() == 0) { + switch (state) { + case PREPARING: + case FAILED: + state = FailbackPlanState.PENDING_ROLLBACK; + break; + case PENDING_PARTICIPANT_REPONSE: + state = FailbackPlanState.PENDING_COMPLETION; + break; + default: + break; + } + } + } + + public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() { + Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>(); + /** + * for each participant, construct a request with the partitions + * that will be failed back or flushed. + */ + for (String participant : participants) { + Set<Integer> partitionToPrepareForFailback = new HashSet<>(); + for (Map.Entry<Integer, String> entry : partition2nodeMap.entrySet()) { + if (entry.getValue().equals(participant)) { + partitionToPrepareForFailback.add(entry.getKey()); + } + } + PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId, + requestId++, participant, partitionToPrepareForFailback); + if (participant.equals(nodeToReleaseMetadataManager)) { + msg.setReleaseMetadataNode(true); + } + node2Partitions.add(msg); + } + return node2Partitions; + } + + public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() { + return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback()); + } + + public String getNodeId() { + return nodeId; + } + + public long getPlanId() { + return planId; + } + + public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) { + this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager; + } + + public synchronized FailbackPlanState getState() { + return state; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Plan ID: " + planId); + sb.append(" Failing back node: " + nodeId); + sb.append(" Participants: " + participants); + sb.append(" Partitions to Failback: " + partition2nodeMap.keySet()); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java new file mode 100644 index 0000000..2d423f9 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.io.IOException; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.IRemoteRecoveryManager; +import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName()); + private final Set<Integer> partitions; + private final String nodeId; + + public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) { + super(planId, requestId); + this.nodeId = nodeId; + this.partitions = partitions; + } + + public Set<Integer> getPartitions() { + return partitions; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(CompleteFailbackRequestMessage.class.getSimpleName()); + sb.append(" Plan ID: " + planId); + sb.append(" Node ID: " + nodeId); + sb.append(" Partitions: " + partitions); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = + (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + HyracksDataException hde = null; + try { + IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); + remoteRecoeryManager.completeFailbackProcess(); + } catch (IOException | InterruptedException e) { + LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e); + hde = ExceptionUtils.convertToHyracksDataException(e); + } finally { + CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId, + requestId, partitions); + try { + broker.sendMessageToCC(reponse); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failure sending message to CC", e); + hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public MessageType getType() { + return MessageType.COMPLETE_FAILBACK_REQUEST; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java new file mode 100644 index 0000000..fb45892 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +import java.util.Set; + +public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private final Set<Integer> partitions; + + public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) { + super(planId, requestId); + this.partitions = partitions; + } + + public Set<Integer> getPartitions() { + return partitions; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(CompleteFailbackResponseMessage.class.getSimpleName()); + sb.append(" Plan ID: " + planId); + sb.append(" Partitions: " + partitions); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + @Override + public MessageType getType() { + return MessageType.COMPLETE_FAILBACK_RESPONSE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java new file mode 100644 index 0000000..3af075e --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class NCLifecycleTaskReportMessage implements INCLifecycleMessage { + + private static final long serialVersionUID = 1L; + private final String nodeId; + private final boolean success; + private Exception exception; + + public NCLifecycleTaskReportMessage(String nodeId, boolean success) { + this.nodeId = nodeId; + this.success = success; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + public String getNodeId() { + return nodeId; + } + + public boolean isSuccess() { + return success; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + + @Override + public MessageType getType() { + return MessageType.STARTUP_TASK_RESULT; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java new file mode 100644 index 0000000..2104f9c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.rmi.RemoteException; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName()); + private final Set<Integer> partitions; + private boolean releaseMetadataNode = false; + private final String nodeID; + + public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) { + super(planId, requestId); + this.nodeID = nodeId; + this.partitions = partitions; + } + + public Set<Integer> getPartitions() { + return partitions; + } + + public boolean isReleaseMetadataNode() { + return releaseMetadataNode; + } + + public void setReleaseMetadataNode(boolean releaseMetadataNode) { + this.releaseMetadataNode = releaseMetadataNode; + } + + public String getNodeID() { + return nodeID; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName()); + sb.append(" Plan ID: " + planId); + sb.append(" Partitions: " + partitions); + sb.append(" releaseMetadataNode: " + releaseMetadataNode); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = + (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + /** + * if the metadata partition will be failed back + * we need to flush and close all datasets including metadata datasets + * otherwise we need to close all non-metadata datasets and flush metadata datasets + * so that their memory components will be copied to the failing back node + */ + if (releaseMetadataNode) { + appContext.getDatasetLifecycleManager().closeAllDatasets(); + //remove the metadata node stub from RMI registry + try { + appContext.unexportMetadataNodeStub(); + } catch (RemoteException e) { + LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } else { + //close all non-metadata datasets + appContext.getDatasetLifecycleManager().closeUserDatasets(); + //flush the remaining metadata datasets that were not closed + appContext.getDatasetLifecycleManager().flushAllDatasets(); + } + + //mark the partitions to be closed as inactive + PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext + .getLocalResourceRepository(); + for (Integer partitionId : partitions) { + localResourceRepo.addInactivePartition(partitionId); + } + + //send response after partitions prepared for failback + PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId, + requestId, partitions); + try { + broker.sendMessageToCC(reponse); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + @Override + public MessageType getType() { + return MessageType.PREPARE_FAILBACK_REQUEST; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java new file mode 100644 index 0000000..e02cd42 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +import java.util.Set; + +public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private final Set<Integer> partitions; + + public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) { + super(planId, requestId); + this.partitions = partitions; + } + + public Set<Integer> getPartitions() { + return partitions; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + @Override + public String toString() { + return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString(); + } + + @Override + public MessageType getType() { + return MessageType.PREPARE_FAILBACK_RESPONSE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java new file mode 100644 index 0000000..96ae8be --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage { + + private static final Logger LOGGER = Logger.getLogger(ReplayPartitionLogsRequestMessage.class.getName()); + private static final long serialVersionUID = 1L; + private final Set<Integer> partitions; + + public ReplayPartitionLogsRequestMessage(Set<Integer> partitions) { + this.partitions = partitions; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + // Replay the logs for these partitions and flush any impacted dataset + appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true); + + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + ReplayPartitionLogsResponseMessage reponse = new ReplayPartitionLogsResponseMessage(ncs.getId(), partitions); + try { + broker.sendMessageToCC(reponse); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + @Override + public MessageType getType() { + return MessageType.REPLAY_LOGS_REQUEST; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java new file mode 100644 index 0000000..dc19735 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +import java.util.Set; + +public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage { + + private static final long serialVersionUID = 1L; + private final Set<Integer> partitions; + private final String nodeId; + + public ReplayPartitionLogsResponseMessage(String nodeId, Set<Integer> partitions) { + this.partitions = partitions; + this.nodeId = nodeId; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + public Set<Integer> getPartitions() { + return partitions; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public MessageType getType() { + return MessageType.REPLAY_LOGS_RESPONSE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java new file mode 100644 index 0000000..be42a9d --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class StartupTaskRequestMessage implements INCLifecycleMessage { + + private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName()); + private static final long serialVersionUID = 1L; + private final SystemState state; + private final String nodeId; + + public StartupTaskRequestMessage(String nodeId, SystemState state) { + this.state = state; + this.nodeId = nodeId; + } + + public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException { + try { + StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState); + ((INCMessageBroker) cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + public SystemState getState() { + return state; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public MessageType getType() { + return MessageType.STARTUP_TASK_REQUEST; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java new file mode 100644 index 0000000..6a72776 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class StartupTaskResponseMessage implements INCLifecycleMessage { + + private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName()); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final List<INCLifecycleTask> tasks; + + public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) { + this.nodeId = nodeId; + this.tasks = tasks; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) cs; + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + boolean success = true; + HyracksDataException exception = null; + try { + for (INCLifecycleTask task : tasks) { + task.perform(cs); + } + } catch (HyracksDataException e) { + success = false; + exception = e; + } + NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success); + result.setException(exception); + try { + broker.sendMessageToCC(result); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + public String getNodeId() { + return nodeId; + } + + @Override + public MessageType getType() { + return MessageType.STARTUP_TASK_RESPONSE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java new file mode 100644 index 0000000..8ce12dd --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName()); + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = + (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + HyracksDataException hde = null; + try { + appContext.initializeMetadata(false); + appContext.exportMetadataNodeStub(); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed taking over metadata", e); + hde = new HyracksDataException(e); + } finally { + TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage( + appContext.getTransactionSubsystem().getId()); + try { + broker.sendMessageToCC(reponse); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed taking over metadata", e); + hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public String toString() { + return TakeoverMetadataNodeRequestMessage.class.getSimpleName(); + } + + @Override + public MessageType getType() { + return MessageType.TAKEOVER_METADATA_NODE_REQUEST; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java new file mode 100644 index 0000000..428047c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class TakeoverMetadataNodeResponseMessage implements INCLifecycleMessage { + + private static final long serialVersionUID = 1L; + private final String nodeId; + + public TakeoverMetadataNodeResponseMessage(String nodeId) { + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + @Override + public String toString() { + return TakeoverMetadataNodeResponseMessage.class.getSimpleName(); + } + + @Override + public MessageType getType() { + return MessageType.TAKEOVER_METADATA_NODE_RESPONSE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java new file mode 100644 index 0000000..4e415de --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.replication.IRemoteRecoveryManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName()); + private final Integer[] partitions; + private final long requestId; + private final String nodeId; + + public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) { + this.requestId = requestId; + this.nodeId = nodeId; + this.partitions = partitionsToTakeover; + } + + public Integer[] getPartitions() { + return partitions; + } + + public long getRequestId() { + return requestId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName()); + sb.append(" Request ID: " + requestId); + sb.append(" Node ID: " + nodeId); + sb.append(" Partitions: "); + for (Integer partitionId : partitions) { + sb.append(partitionId + ","); + } + //remove last comma + sb.charAt(sb.length() - 1); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + NodeControllerService ncs = (NodeControllerService) cs; + IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + //if the NC is shutting down, it should ignore takeover partitions request + if (!appContext.isShuttingdown()) { + HyracksDataException hde = null; + try { + IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); + remoteRecoeryManager.takeoverPartitons(partitions); + } catch (IOException | ACIDException e) { + LOGGER.log(Level.SEVERE, "Failure taking over partitions", e); + hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); + } finally { + //send response after takeover is completed + TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId, + appContext.getTransactionSubsystem().getId(), partitions); + try { + broker.sendMessageToCC(reponse); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failure taking over partitions", e); + hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); + } + } + if (hde != null) { + throw hde; + } + } + } + + @Override + public MessageType getType() { + return MessageType.TAKEOVER_PARTITION_REQUEST; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java new file mode 100644 index 0000000..e653a64 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication.message; + +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class TakeoverPartitionsResponseMessage implements INCLifecycleMessage { + + private static final long serialVersionUID = 1L; + private final Integer[] partitions; + private final String nodeId; + private final long requestId; + + public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) { + this.requestId = requestId; + this.nodeId = nodeId; + this.partitions = partitionsToTakeover; + } + + public Integer[] getPartitions() { + return partitions; + } + + public String getNodeId() { + return nodeId; + } + + public long getRequestId() { + return requestId; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { + AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this); + } + + @Override + public String toString() { + return TakeoverPartitionsResponseMessage.class.getSimpleName(); + } + + @Override + public MessageType getType() { + return MessageType.TAKEOVER_PARTITION_RESPONSE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index 62eb250..f3182cf 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -18,41 +18,23 @@ */ package org.apache.asterix.hyracks.bootstrap; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; - -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - import org.apache.asterix.active.ActiveLifecycleListener; -import org.apache.asterix.api.http.server.ApiServlet; -import org.apache.asterix.api.http.server.ClusterApiServlet; -import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; -import org.apache.asterix.api.http.server.ConnectorApiServlet; -import org.apache.asterix.api.http.server.DdlApiServlet; -import org.apache.asterix.api.http.server.DiagnosticsApiServlet; -import org.apache.asterix.api.http.server.FeedServlet; -import org.apache.asterix.api.http.server.FullApiServlet; -import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet; -import org.apache.asterix.api.http.server.QueryApiServlet; -import org.apache.asterix.api.http.server.QueryResultApiServlet; -import org.apache.asterix.api.http.server.QueryServiceServlet; -import org.apache.asterix.api.http.server.QueryStatusApiServlet; -import org.apache.asterix.api.http.server.QueryWebInterfaceServlet; -import org.apache.asterix.api.http.server.ShutdownApiServlet; -import org.apache.asterix.api.http.server.UpdateApiServlet; -import org.apache.asterix.api.http.server.VersionApiServlet; +import org.apache.asterix.api.http.server.*; import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.external.ExternalLibraryUtils; +import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.config.AsterixExtension; +import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.file.StorageComponentProvider; @@ -70,13 +52,19 @@ import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; -import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.http.api.IServlet; import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.http.server.WebManager; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR; +import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; + public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName()); @@ -90,7 +78,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { @Override public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception { final ClusterControllerService controllerService = (ClusterControllerService) ccAppCtx.getControllerService(); - IMessageBroker messageBroker = new CCMessageBroker(controllerService); + ICCMessageBroker messageBroker = new CCMessageBroker(controllerService); this.appCtx = ccAppCtx; if (LOGGER.isLoggable(Level.INFO)) { @@ -100,11 +88,14 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { appCtx.setThreadFactory(new AsterixThreadFactory(appCtx.getThreadFactory(), new LifeCycleComponentManager())); ILibraryManager libraryManager = new ExternalLibraryManager(); ResourceIdManager resourceIdManager = new ResourceIdManager(); + IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); + IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory + .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, messageBroker); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); componentProvider = new StorageComponentProvider(); GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection(), componentProvider); AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), libraryManager, resourceIdManager, - () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance()); + () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy); ccExtensionManager = new CCExtensionManager(getExtensions()); AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig();
