http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java index 0dcdc7b..fcc997f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java @@ -27,13 +27,14 @@ import java.util.Map.Entry; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.replication.IReplicationManager; @@ -41,6 +42,7 @@ import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.replication.storage.ReplicaResourcesManager; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class RemoteRecoveryManager implements IRemoteRecoveryManager { @@ -61,8 +63,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { //1. identify which replicas reside in this node String localNodeId = runtimeContext.getTransactionSubsystem().getId(); - Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId); - + Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId); Map<String, Set<String>> recoveryCandidates = new HashMap<>(); Map<String, Integer> candidatesScore = new HashMap<>(); @@ -124,16 +125,9 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { } @Override - public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException { - /** - * TODO even though the takeover is always expected to succeed, - * in case of any failure during the takeover, the CC should be - * notified that the takeover failed. - */ - Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions)); + public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException { ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); - - long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitionsToTakeover); + long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions); long readableSmallestLSN = logManager.getReadableSmallestLSN(); if (minLSN < readableSmallestLSN) { minLSN = readableSmallestLSN; @@ -141,7 +135,25 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { //replay logs > minLSN that belong to these partitions IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - recoveryManager.replayPartitionsLogs(partitionsToTakeover, logManager.getLogReader(true), minLSN); + try { + recoveryManager.replayPartitionsLogs(partitions, logManager.getLogReader(true), minLSN); + if (flush) { + runtimeContext.getDatasetLifecycleManager().flushAllDatasets(); + } + } catch (IOException | ACIDException e) { + throw new HyracksDataException(e); + } + } + + @Override + public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException { + /** + * TODO even though the takeover is always expected to succeed, + * in case of any failure during the takeover, the CC should be + * notified that the takeover failed. + */ + Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions)); + replayReplicaPartitionLogs(partitionsToTakeover, false); //mark these partitions as active in this node PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext @@ -157,8 +169,9 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext .getLocalResourceRepository(); IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager(); + Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext).getMetadataProperties() + .getNodePartitions(); - failbackRecoveryReplicas = new HashMap<>(); while (true) { //start recovery steps try { @@ -189,10 +202,15 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { /*** Start Recovery Per Lost Replica ***/ for (Entry<String, Set<String>> remoteReplica : failbackRecoveryReplicas.entrySet()) { String replicaId = remoteReplica.getKey(); - Set<String> partitionsToRecover = remoteReplica.getValue(); + Set<String> ncsToRecoverFor = remoteReplica.getValue(); + Set<Integer> partitionsIds = new HashSet<>(); + for (String node : ncsToRecoverFor) { + partitionsIds.addAll((Arrays.asList(nodePartitions.get(node))).stream() + .map(ClusterPartition::getPartitionId).collect(Collectors.toList())); + } //1. Request indexes metadata and LSM components - replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, new HashSet<String>()); + replicationManager.requestReplicaFiles(replicaId, partitionsIds, new HashSet<String>()); } break; } catch (IOException e) { @@ -209,8 +227,8 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext .getReplicaResourcesManager(); - Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext) - .getMetadataProperties().getNodePartitions(); + Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext).getMetadataProperties() + .getNodePartitions(); /** * for each lost partition, get the remaining files from replicas @@ -221,17 +239,19 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { String replicaId = remoteReplica.getKey(); Set<String> NCsDataToRecover = remoteReplica.getValue(); Set<String> existingFiles = new HashSet<>(); + Set<Integer> partitionsToRecover = new HashSet<>(); for (String nodeId : NCsDataToRecover) { //get partitions that will be recovered from this node ClusterPartition[] replicaPartitions = nodePartitions.get(nodeId); for (ClusterPartition partition : replicaPartitions) { existingFiles.addAll( replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), true)); + partitionsToRecover.add(partition.getPartitionId()); } } //Request remaining indexes files - replicationManager.requestReplicaFiles(replicaId, NCsDataToRecover, existingFiles); + replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, existingFiles); } } catch (IOException e) { /** @@ -256,4 +276,52 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager { failbackRecoveryReplicas = null; } -} + + //TODO refactor common code between remote recovery and failback process + @Override + public void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) throws HyracksDataException { + int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts(); + PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext + .getLocalResourceRepository(); + IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager(); + ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); + while (true) { + //start recovery steps + try { + if (maxRecoveryAttempts <= 0) { + //to avoid infinite loop in case of unexpected behavior. + throw new IllegalStateException("Failed to perform remote recovery."); + } + + /*** Prepare for Recovery ***/ + //1. clean any memory data that could've existed from previous failed recovery attempt + datasetLifeCycleManager.closeAllDatasets(); + + //2. remove any existing storage data and initialize storage metadata + resourceRepository.deleteStorageData(true); + resourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName()); + + /*** Start Recovery Per Lost Replica ***/ + for (Entry<String, Set<Integer>> remoteReplica : recoveryPlan.entrySet()) { + String replicaId = remoteReplica.getKey(); + Set<Integer> partitionsToRecover = remoteReplica.getValue(); + + //Request indexes metadata and LSM components + replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, new HashSet<String>()); + } + + //get max LSN from selected remote replicas + long maxRemoteLSN = replicationManager.getMaxRemoteLSN(recoveryPlan.keySet()); + + //6. force LogManager to start from a partition > maxLSN in selected remote replicas + logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN); + break; + } catch (IOException e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...", e); + } + maxRecoveryAttempts--; + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index cce3dc4..6a2ebf6 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -38,8 +38,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; @@ -285,7 +285,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { partitionFiles.add(file.getAbsolutePath()); } else { partitionFiles.add( - PersistentLocalResourceRepository.getResourceRelativePath(file.getAbsolutePath())); + StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath())); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java index 7c386d5..b2855f6 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java @@ -18,9 +18,9 @@ */ package org.apache.asterix.runtime.message; -import org.apache.asterix.common.messaging.api.IApplicationMessage; +import org.apache.asterix.common.replication.INCLifecycleMessage; -public abstract class AbstractFailbackPlanMessage implements IApplicationMessage { +public abstract class AbstractFailbackPlanMessage implements INCLifecycleMessage { private static final long serialVersionUID = 1L; protected final long planId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java deleted file mode 100644 index 2454800..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.io.IOException; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.exceptions.ExceptionUtils; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.replication.IRemoteRecoveryManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage { - - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName()); - private final Set<Integer> partitions; - private final String nodeId; - - public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) { - super(planId, requestId); - this.nodeId = nodeId; - this.partitions = partitions; - } - - public Set<Integer> getPartitions() { - return partitions; - } - - public String getNodeId() { - return nodeId; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(CompleteFailbackRequestMessage.class.getSimpleName()); - sb.append(" Plan ID: " + planId); - sb.append(" Node ID: " + nodeId); - sb.append(" Partitions: " + partitions); - return sb.toString(); - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - NodeControllerService ncs = (NodeControllerService) cs; - IAppRuntimeContext appContext = - (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); - INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); - HyracksDataException hde = null; - try { - IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); - remoteRecoeryManager.completeFailbackProcess(); - } catch (IOException | InterruptedException e) { - LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e); - hde = ExceptionUtils.convertToHyracksDataException(e); - } finally { - CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId, - requestId, partitions); - try { - broker.sendMessageToCC(reponse); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Failure sending message to CC", e); - hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); - } - } - if (hde != null) { - throw hde; - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java deleted file mode 100644 index 07c366c..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.util.Set; - -import org.apache.asterix.runtime.utils.ClusterStateManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage { - - private static final long serialVersionUID = 1L; - private final Set<Integer> partitions; - - public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) { - super(planId, requestId); - this.partitions = partitions; - } - - public Set<Integer> getPartitions() { - return partitions; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(CompleteFailbackResponseMessage.class.getSimpleName()); - sb.append(" Plan ID: " + planId); - sb.append(" Partitions: " + partitions); - return sb.toString(); - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java deleted file mode 100644 index 24f5fe8..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class NodeFailbackPlan { - - public enum FailbackPlanState { - /** - * Initial state while selecting the nodes that will participate - * in the node failback plan. - */ - PREPARING, - /** - * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added, - * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate - * a response is expected and need to wait for it. - */ - PENDING_PARTICIPANT_REPONSE, - /** - * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response, - * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate - * the need to send {@link CompleteFailbackRequestMessage} to the failing back node. - */ - PENDING_COMPLETION, - /** - * if any of the participants fail or the failing back node itself fails during - * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION), - * the state is changed to FAILED. - */ - FAILED, - /** - * if the state is FAILED, and all pending responses (if any) have been received, - * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert - * the effects of this plan (if any). - */ - PENDING_ROLLBACK - } - - private static long planIdGenerator = 0; - private long planId; - private final String nodeId; - private final Set<String> participants; - private final Map<Integer, String> partition2nodeMap; - private String nodeToReleaseMetadataManager; - private int requestId; - private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests; - private FailbackPlanState state; - - public static NodeFailbackPlan createPlan(String nodeId) { - return new NodeFailbackPlan(planIdGenerator++, nodeId); - } - - private NodeFailbackPlan(long planId, String nodeId) { - this.planId = planId; - this.nodeId = nodeId; - participants = new HashSet<>(); - partition2nodeMap = new HashMap<>(); - pendingRequests = new HashMap<>(); - state = FailbackPlanState.PREPARING; - } - - public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) { - partition2nodeMap.put(partitionId, currentActiveNode); - } - - public synchronized void addParticipant(String nodeId) { - participants.add(nodeId); - } - - public synchronized void notifyNodeFailure(String failedNode) { - if (participants.contains(failedNode)) { - if (state == FailbackPlanState.PREPARING) { - state = FailbackPlanState.FAILED; - } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) { - /** - * if there is any pending request from this failed node, - * it should be marked as completed and the plan should be marked as failed - */ - Set<Integer> failedRequests = new HashSet<>(); - for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) { - if (request.getNodeID().equals(failedNode)) { - failedRequests.add(request.getRequestId()); - } - } - - if (!failedRequests.isEmpty()) { - state = FailbackPlanState.FAILED; - for (Integer failedRequestId : failedRequests) { - markRequestCompleted(failedRequestId); - } - } - } - } else if (nodeId.equals(failedNode)) { - //if the failing back node is the failed node itself - state = FailbackPlanState.FAILED; - updateState(); - } - } - - public synchronized Set<Integer> getPartitionsToFailback() { - return new HashSet<>(partition2nodeMap.keySet()); - } - - public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) { - //if this is the first request - if (pendingRequests.size() == 0) { - state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE; - } - pendingRequests.put(msg.getRequestId(), msg); - } - - public synchronized void markRequestCompleted(int requestId) { - pendingRequests.remove(requestId); - updateState(); - } - - private void updateState() { - if (pendingRequests.size() == 0) { - switch (state) { - case PREPARING: - case FAILED: - state = FailbackPlanState.PENDING_ROLLBACK; - break; - case PENDING_PARTICIPANT_REPONSE: - state = FailbackPlanState.PENDING_COMPLETION; - break; - default: - break; - } - } - } - - public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() { - Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>(); - /** - * for each participant, construct a request with the partitions - * that will be failed back or flushed. - */ - for (String participant : participants) { - Set<Integer> partitionToPrepareForFailback = new HashSet<>(); - for (Map.Entry<Integer, String> entry : partition2nodeMap.entrySet()) { - if (entry.getValue().equals(participant)) { - partitionToPrepareForFailback.add(entry.getKey()); - } - } - PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId, - requestId++, participant, partitionToPrepareForFailback); - if (participant.equals(nodeToReleaseMetadataManager)) { - msg.setReleaseMetadataNode(true); - } - node2Partitions.add(msg); - } - return node2Partitions; - } - - public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() { - return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback()); - } - - public String getNodeId() { - return nodeId; - } - - public long getPlanId() { - return planId; - } - - public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) { - this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager; - } - - public synchronized FailbackPlanState getState() { - return state; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Plan ID: " + planId); - sb.append(" Failing back node: " + nodeId); - sb.append(" Participants: " + participants); - sb.append(" Partitions to Failback: " + partition2nodeMap.keySet()); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java deleted file mode 100644 index 985c741..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.rmi.RemoteException; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.exceptions.ExceptionUtils; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage { - - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName()); - private final Set<Integer> partitions; - private boolean releaseMetadataNode = false; - private final String nodeID; - - public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) { - super(planId, requestId); - this.nodeID = nodeId; - this.partitions = partitions; - } - - public Set<Integer> getPartitions() { - return partitions; - } - - public boolean isReleaseMetadataNode() { - return releaseMetadataNode; - } - - public void setReleaseMetadataNode(boolean releaseMetadataNode) { - this.releaseMetadataNode = releaseMetadataNode; - } - - public String getNodeID() { - return nodeID; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName()); - sb.append(" Plan ID: " + planId); - sb.append(" Partitions: " + partitions); - sb.append(" releaseMetadataNode: " + releaseMetadataNode); - return sb.toString(); - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - NodeControllerService ncs = (NodeControllerService) cs; - IAppRuntimeContext appContext = - (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); - INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); - /** - * if the metadata partition will be failed back - * we need to flush and close all datasets including metadata datasets - * otherwise we need to close all non-metadata datasets and flush metadata datasets - * so that their memory components will be copied to the failing back node - */ - if (releaseMetadataNode) { - appContext.getDatasetLifecycleManager().closeAllDatasets(); - //remove the metadata node stub from RMI registry - try { - appContext.unexportMetadataNodeStub(); - } catch (RemoteException e) { - LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e); - throw ExceptionUtils.convertToHyracksDataException(e); - } - } else { - //close all non-metadata datasets - appContext.getDatasetLifecycleManager().closeUserDatasets(); - //flush the remaining metadata datasets that were not closed - appContext.getDatasetLifecycleManager().flushAllDatasets(); - } - - //mark the partitions to be closed as inactive - PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext - .getLocalResourceRepository(); - for (Integer partitionId : partitions) { - localResourceRepo.addInactivePartition(partitionId); - } - - //send response after partitions prepared for failback - PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId, - requestId, partitions); - try { - broker.sendMessageToCC(reponse); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); - throw ExceptionUtils.convertToHyracksDataException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java deleted file mode 100644 index c655ecd..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.util.Set; - -import org.apache.asterix.runtime.utils.ClusterStateManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage { - - private static final long serialVersionUID = 1L; - private final Set<Integer> partitions; - - public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) { - super(planId, requestId); - this.partitions = partitions; - } - - public Set<Integer> getPartitions() { - return partitions; - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this); - } - - @Override - public String toString() { - return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java deleted file mode 100644 index 1e67052..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.exceptions.ExceptionUtils; -import org.apache.asterix.common.messaging.api.IApplicationMessage; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class TakeoverMetadataNodeRequestMessage implements IApplicationMessage { - - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName()); - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - NodeControllerService ncs = (NodeControllerService) cs; - IAppRuntimeContext appContext = - (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); - INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); - HyracksDataException hde = null; - try { - appContext.initializeMetadata(false); - appContext.exportMetadataNodeStub(); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Failed taking over metadata", e); - hde = new HyracksDataException(e); - } finally { - TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage( - appContext.getTransactionSubsystem().getId()); - try { - broker.sendMessageToCC(reponse); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Failed taking over metadata", e); - hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public String toString() { - return TakeoverMetadataNodeRequestMessage.class.getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java deleted file mode 100644 index d8b2136..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import org.apache.asterix.common.messaging.api.IApplicationMessage; -import org.apache.asterix.runtime.utils.ClusterStateManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -public class TakeoverMetadataNodeResponseMessage implements IApplicationMessage { - - private static final long serialVersionUID = 1L; - private final String nodeId; - - public TakeoverMetadataNodeResponseMessage(String nodeId) { - this.nodeId = nodeId; - } - - public String getNodeId() { - return nodeId; - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this); - } - - @Override - public String toString() { - return TakeoverMetadataNodeResponseMessage.class.getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java deleted file mode 100644 index fb8a33b..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import java.io.IOException; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.ExceptionUtils; -import org.apache.asterix.common.messaging.api.IApplicationMessage; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.replication.IRemoteRecoveryManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class TakeoverPartitionsRequestMessage implements IApplicationMessage { - - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName()); - private final Integer[] partitions; - private final long requestId; - private final String nodeId; - - public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) { - this.requestId = requestId; - this.nodeId = nodeId; - this.partitions = partitionsToTakeover; - } - - public Integer[] getPartitions() { - return partitions; - } - - public long getRequestId() { - return requestId; - } - - public String getNodeId() { - return nodeId; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName()); - sb.append(" Request ID: " + requestId); - sb.append(" Node ID: " + nodeId); - sb.append(" Partitions: "); - for (Integer partitionId : partitions) { - sb.append(partitionId + ","); - } - //remove last comma - sb.charAt(sb.length() - 1); - return sb.toString(); - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - NodeControllerService ncs = (NodeControllerService) cs; - IAppRuntimeContext appContext = - (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); - INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); - //if the NC is shutting down, it should ignore takeover partitions request - if (!appContext.isShuttingdown()) { - HyracksDataException hde = null; - try { - IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); - remoteRecoeryManager.takeoverPartitons(partitions); - } catch (IOException | ACIDException e) { - LOGGER.log(Level.SEVERE, "Failure taking over partitions", e); - hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); - } finally { - //send response after takeover is completed - TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId, - appContext.getTransactionSubsystem().getId(), partitions); - try { - broker.sendMessageToCC(reponse); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Failure taking over partitions", e); - hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); - } - } - if (hde != null) { - throw hde; - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java deleted file mode 100644 index 81492c2..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import org.apache.asterix.common.messaging.api.IApplicationMessage; -import org.apache.asterix.runtime.utils.ClusterStateManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; - -public class TakeoverPartitionsResponseMessage implements IApplicationMessage { - - private static final long serialVersionUID = 1L; - private final Integer[] partitions; - private final String nodeId; - private final long requestId; - - public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) { - this.requestId = requestId; - this.nodeId = nodeId; - this.partitions = partitionsToTakeover; - } - - public Integer[] getPartitions() { - return partitions; - } - - public String getNodeId() { - return nodeId; - } - - public long getRequestId() { - return requestId; - } - - @Override - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { - ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this); - } - - @Override - public String toString() { - return TakeoverPartitionsResponseMessage.class.getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java index 355f503..6193931 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java @@ -18,33 +18,23 @@ */ package org.apache.asterix.runtime.utils; -import java.io.IOException; -import java.util.function.Supplier; -import java.util.logging.Logger; - import org.apache.asterix.common.cluster.IGlobalRecoveryManager; -import org.apache.asterix.common.config.BuildProperties; -import org.apache.asterix.common.config.CompilerProperties; -import org.apache.asterix.common.config.ExtensionProperties; -import org.apache.asterix.common.config.ExternalProperties; -import org.apache.asterix.common.config.FeedProperties; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.config.MessagingProperties; -import org.apache.asterix.common.config.MetadataProperties; -import org.apache.asterix.common.config.PropertiesAccessor; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.TransactionProperties; +import org.apache.asterix.common.config.*; import org.apache.asterix.common.dataflow.IApplicationContextInfo; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.metadata.IMetadataBootstrap; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.common.IStorageManager; +import java.io.IOException; +import java.util.function.Supplier; +import java.util.logging.Logger; + /* * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager * instances that are accessed from the NCs. In addition an instance of ICCApplicationContext @@ -71,13 +61,15 @@ public class AppContextInfo implements IApplicationContextInfo, IPropertiesProvi private IHyracksClientConnection hcc; private Object extensionManager; private volatile boolean initialized = false; + private IFaultToleranceStrategy ftStrategy; private AppContextInfo() { } public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, IResourceIdManager resourceIdManager, - Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager) + Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, + IFaultToleranceStrategy ftStrategy) throws AsterixException, IOException { if (INSTANCE.initialized) { throw new AsterixException(AppContextInfo.class.getSimpleName() + " has been initialized already"); @@ -98,6 +90,7 @@ public class AppContextInfo implements IApplicationContextInfo, IPropertiesProvi INSTANCE.feedProperties = new FeedProperties(propertiesAccessor); INSTANCE.extensionProperties = new ExtensionProperties(propertiesAccessor); INSTANCE.replicationProperties = new ReplicationProperties(propertiesAccessor); + INSTANCE.ftStrategy = ftStrategy; INSTANCE.hcc = hcc; INSTANCE.buildProperties = new BuildProperties(propertiesAccessor); INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor); @@ -205,4 +198,8 @@ public class AppContextInfo implements IApplicationContextInfo, IPropertiesProvi public IMetadataBootstrap getMetadataBootstrap() { return metadataBootstrapSupplier.get(); } + + public IFaultToleranceStrategy getFaultToleranceStrategy() { + return ftStrategy; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 8fffdb1..d975a98 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -19,50 +19,36 @@ package org.apache.asterix.runtime.utils; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.logging.Level; import java.util.logging.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.asterix.event.schema.cluster.Node; -import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage; -import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage; -import org.apache.asterix.runtime.message.NodeFailbackPlan; -import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState; -import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage; -import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage; -import org.apache.asterix.runtime.message.ReplicaEventMessage; -import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage; -import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage; -import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage; -import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; /** * A holder class for properties related to the Asterix cluster. */ -public class ClusterStateManager { +public class ClusterStateManager implements IClusterStateManager { /* * TODO: currently after instance restarts we require all nodes to join again, * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance @@ -71,9 +57,8 @@ public class ClusterStateManager { private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName()); public static final ClusterStateManager INSTANCE = new ClusterStateManager(); - private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address"; private static final String IO_DEVICES = "iodevices"; - private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>(); + private final Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>(); private final Cluster cluster; private ClusterState state = ClusterState.UNUSABLE; @@ -84,32 +69,21 @@ public class ClusterStateManager { private Map<String, ClusterPartition[]> node2PartitionsMap = null; private SortedMap<Integer, ClusterPartition> clusterPartitions = null; - private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null; - private long clusterRequestId = 0; private String currentMetadataNode = null; private boolean metadataNodeActive = false; - private boolean autoFailover = false; - private boolean replicationEnabled = false; private Set<String> failedNodes = new HashSet<>(); - private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans; - private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap; + private IFaultToleranceStrategy ftStrategy; private ClusterStateManager() { cluster = ClusterProperties.INSTANCE.getCluster(); // if this is the CC process - if (AppContextInfo.INSTANCE.initialized() - && AppContextInfo.INSTANCE.getCCApplicationContext() != null) { + if (AppContextInfo.INSTANCE.initialized() && AppContextInfo.INSTANCE.getCCApplicationContext() != null) { node2PartitionsMap = AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions(); clusterPartitions = AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions(); currentMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName(); - replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); - autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled(); - if (autoFailover) { - pendingTakeoverRequests = new HashMap<>(); - pendingProcessingFailbackPlans = new LinkedList<>(); - planId2FailbackPlanMap = new HashMap<>(); - } + ftStrategy = AppContextInfo.INSTANCE.getFaultToleranceStrategy(); + ftStrategy.bindTo(this); } } @@ -117,30 +91,8 @@ public class ClusterStateManager { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Removing configuration parameters for node id " + nodeId); } - activeNcConfiguration.remove(nodeId); - - //if this node was waiting for failback and failed before it completed - if (failedNodes.contains(nodeId)) { - if (autoFailover) { - notifyFailbackPlansNodeFailure(nodeId); - revertFailedFailbackPlanEffects(); - } - } else { - //an active node failed - failedNodes.add(nodeId); - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = false; - LOGGER.info("Metadata node is now inactive"); - } - updateNodePartitions(nodeId, false); - if (replicationEnabled) { - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE); - if (autoFailover) { - notifyFailbackPlansNodeFailure(nodeId); - requestPartitionsTakeover(nodeId); - } - } - } + failedNodes.add(nodeId); + ftStrategy.notifyNodeFailure(nodeId); } public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) @@ -149,46 +101,51 @@ public class ClusterStateManager { LOGGER.info("Registering configuration parameters for node id " + nodeId); } activeNcConfiguration.put(nodeId, configuration); + failedNodes.remove(nodeId); + ftStrategy.notifyNodeJoin(nodeId); + } - //a node trying to come back after failure - if (failedNodes.contains(nodeId)) { - if (autoFailover) { - prepareFailbackPlan(nodeId); - return; - } else { - //a node completed local or remote recovery and rejoined - failedNodes.remove(nodeId); - if (replicationEnabled) { - //notify other replica to reconnect to this node - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); - } - } - } + @Override + public synchronized void setState(ClusterState state) { + this.state = state; + LOGGER.info("Cluster State is now " + state.name()); + } - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = true; - LOGGER.info("Metadata node is now active"); + @Override + public void updateMetadataNode(String nodeId, boolean active) { + currentMetadataNode = nodeId; + metadataNodeActive = active; + if (active) { + LOGGER.info(String.format("Metadata node %s is now active", currentMetadataNode)); } - updateNodePartitions(nodeId, true); } - private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException { + @Override + public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException { ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); // if this isn't a storage node, it will not have cluster partitions if (nodePartitions != null) { for (ClusterPartition p : nodePartitions) { - // set the active node for this node's partitions - p.setActive(added); - if (added) { - p.setActiveNodeId(nodeId); - } + updateClusterPartition(p.getPartitionId(), nodeId, active); } - resetClusterPartitionConstraint(); - updateClusterState(); } } - private synchronized void updateClusterState() throws HyracksDataException { + @Override + public synchronized void updateClusterPartition(Integer partitionNum, String activeNode, boolean active) { + ClusterPartition clusterPartition = clusterPartitions.get(partitionNum); + if (clusterPartition != null) { + // set the active node for this node's partitions + clusterPartition.setActive(active); + if (active) { + clusterPartition.setActiveNodeId(activeNode); + } + } + } + + @Override + public synchronized void refreshState() throws HyracksDataException { + resetClusterPartitionConstraint(); for (ClusterPartition p : clusterPartitions.values()) { if (!p.isActive()) { state = ClusterState.UNUSABLE; @@ -196,20 +153,19 @@ public class ClusterStateManager { return; } } + + state = ClusterState.PENDING; + LOGGER.info("Cluster is now " + state); + // if all storage partitions are active as well as the metadata node, then the cluster is active if (metadataNodeActive) { - state = ClusterState.PENDING; - LOGGER.info("Cluster is now " + state); AppContextInfo.INSTANCE.getMetadataBootstrap().init(); state = ClusterState.ACTIVE; LOGGER.info("Cluster is now " + state); + // Notify any waiting threads for the cluster to be active. + notifyAll(); // start global recovery AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery(); - if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) { - processPendingFailbackPlans(); - } - } else { - requestMetadataNodeTakeover(); } } @@ -232,6 +188,7 @@ public class ClusterStateManager { return ncConfig.get(IO_DEVICES).split(","); } + @Override public ClusterState getState() { return state; } @@ -287,6 +244,7 @@ public class ClusterStateManager { return AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size(); } + @Override public synchronized ClusterPartition[] getNodePartitions(String nodeId) { return node2PartitionsMap.get(nodeId); } @@ -298,6 +256,7 @@ public class ClusterStateManager { return 0; } + @Override public synchronized ClusterPartition[] getClusterPartitons() { ArrayList<ClusterPartition> partitons = new ArrayList<>(); for (ClusterPartition partition : clusterPartitions.values()) { @@ -306,331 +265,6 @@ public class ClusterStateManager { return partitons.toArray(new ClusterPartition[] {}); } - private synchronized void requestPartitionsTakeover(String failedNodeId) { - //replica -> list of partitions to takeover - Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>(); - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - - //collect the partitions of the failed NC - List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId); - if (!lostPartitions.isEmpty()) { - for (ClusterPartition partition : lostPartitions) { - //find replicas for this partitions - Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId()); - //find a replica that is still active - for (String replica : partitionReplicas) { - //TODO (mhubail) currently this assigns the partition to the first found active replica. - //It needs to be modified to consider load balancing. - if (addActiveReplica(replica, partition, partitionRecoveryPlan)) { - break; - } - } - } - - if (partitionRecoveryPlan.size() == 0) { - //no active replicas were found for the failed node - LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions); - return; - } else { - LOGGER.info("Partitions to recover: " + lostPartitions); - } - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - //For each replica, send a request to takeover the assigned partitions - for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) { - String replica = entry.getKey(); - Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]); - long requestId = clusterRequestId++; - TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, - replica, partitionsToTakeover); - pendingTakeoverRequests.put(requestId, takeoverRequest); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, replica); - } catch (Exception e) { - /** - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, we will send any pending request - * that belongs to the failed NC to a different active replica. - */ - LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e); - } - } - } - } - - private boolean addActiveReplica(String replica, ClusterPartition partition, - Map<String, List<Integer>> partitionRecoveryPlan) { - if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) { - if (!partitionRecoveryPlan.containsKey(replica)) { - List<Integer> replicaPartitions = new ArrayList<>(); - replicaPartitions.add(partition.getPartitionId()); - partitionRecoveryPlan.put(replica, replicaPartitions); - } else { - partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); - } - return true; - } - return false; - } - - private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) { - List<ClusterPartition> nodePartitions = new ArrayList<>(); - for (ClusterPartition partition : clusterPartitions.values()) { - if (partition.getActiveNodeId().equals(nodeId)) { - nodePartitions.add(partition); - } - } - /** - * if there is any pending takeover request that this node was supposed to handle, - * it needs to be sent to a different replica - */ - List<Long> failedTakeoverRequests = new ArrayList<>(); - for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) { - if (request.getNodeId().equals(nodeId)) { - for (Integer partitionId : request.getPartitions()) { - nodePartitions.add(clusterPartitions.get(partitionId)); - } - failedTakeoverRequests.add(request.getRequestId()); - } - } - - //remove failed requests - for (Long requestId : failedTakeoverRequests) { - pendingTakeoverRequests.remove(requestId); - } - return nodePartitions; - } - - private synchronized void requestMetadataNodeTakeover() { - //need a new node to takeover metadata node - ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties() - .getMetadataPartition(); - //request the metadataPartition node to register itself as the metadata node - TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage(); - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId()); - } catch (Exception e) { - /** - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, a new NC will be assigned to - * the metadata partition and a new metadata node takeover request will be sent to it. - */ - LOGGER.log(Level.WARNING, - "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e); - } - } - - public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response) - throws HyracksDataException { - for (Integer partitonId : response.getPartitions()) { - ClusterPartition partition = clusterPartitions.get(partitonId); - partition.setActive(true); - partition.setActiveNodeId(response.getNodeId()); - } - pendingTakeoverRequests.remove(response.getRequestId()); - resetClusterPartitionConstraint(); - updateClusterState(); - } - - public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response) - throws HyracksDataException { - currentMetadataNode = response.getNodeId(); - metadataNodeActive = true; - LOGGER.info("Current metadata node: " + currentMetadataNode); - updateClusterState(); - } - - private synchronized void prepareFailbackPlan(String failingBackNodeId) { - NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId); - pendingProcessingFailbackPlans.add(plan); - planId2FailbackPlanMap.put(plan.getPlanId(), plan); - - //get all partitions this node requires to resync - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId); - for (String replicaId : nodeReplicas) { - ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId); - for (ClusterPartition partition : nodePartitions) { - plan.addParticipant(partition.getActiveNodeId()); - /** - * if the partition original node is the returning node, - * add it to the list of the partitions which will be failed back - */ - if (partition.getNodeId().equals(failingBackNodeId)) { - plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId()); - } - } - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Prepared Failback plan: " + plan.toString()); - } - - processPendingFailbackPlans(); - } - - private synchronized void processPendingFailbackPlans() { - /** - * if the cluster state is not ACTIVE, then failbacks should not be processed - * since some partitions are not active - */ - if (state == ClusterState.ACTIVE) { - while (!pendingProcessingFailbackPlans.isEmpty()) { - //take the first pending failback plan - NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop(); - /** - * A plan at this stage will be in one of two states: - * 1. PREPARING -> the participants were selected but we haven't sent any request. - * 2. PENDING_ROLLBACK -> a participant failed before we send any requests - */ - if (plan.getState() == FailbackPlanState.PREPARING) { - //set the partitions that will be failed back as inactive - String failbackNode = plan.getNodeId(); - for (Integer partitionId : plan.getPartitionsToFailback()) { - ClusterPartition clusterPartition = clusterPartitions.get(partitionId); - clusterPartition.setActive(false); - //partition expected to be returned to the failing back node - clusterPartition.setActiveNodeId(failbackNode); - } - - /** - * if the returning node is the original metadata node, - * then metadata node will change after the failback completes - */ - String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties() - .getMetadataNodeName(); - if (originalMetadataNode.equals(failbackNode)) { - plan.setNodeToReleaseMetadataManager(currentMetadataNode); - currentMetadataNode = ""; - metadataNodeActive = false; - } - - //force new jobs to wait - state = ClusterState.REBALANCING; - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE - .getCCApplicationContext().getMessageBroker(); - handleFailbackRequests(plan, messageBroker); - /** - * wait until the current plan is completed before processing the next plan. - * when the current one completes or is reverted, the cluster state will be - * ACTIVE again, and the next failback plan (if any) will be processed. - */ - break; - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //this plan failed before sending any requests -> nothing to rollback - planId2FailbackPlanMap.remove(plan.getPlanId()); - } - } - } - } - - private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) { - //send requests to other nodes to complete on-going jobs and prepare partitions for failback - for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) { - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeID()); - plan.addPendingRequest(request); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e); - plan.notifyNodeFailure(request.getNodeID()); - revertFailedFailbackPlanEffects(); - break; - } - } - } - - public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) { - NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId()); - plan.markRequestCompleted(msg.getRequestId()); - /** - * A plan at this stage will be in one of three states: - * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait). - * 2. PENDING_COMPLETION -> all responses received (time to send completion request). - * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert). - */ - if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) { - CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage(); - - //send complete resync and takeover partitions to the failing back node - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeId()); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e); - notifyFailbackPlansNodeFailure(request.getNodeId()); - revertFailedFailbackPlanEffects(); - } - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - revertFailedFailbackPlanEffects(); - } - } - - public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response) - throws HyracksDataException { - /** - * the failback plan completed successfully: - * Remove all references to it. - * Remove the the failing back node from the failed nodes list. - * Notify its replicas to reconnect to it. - * Set the failing back node partitions as active. - */ - NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId()); - String nodeId = plan.getNodeId(); - failedNodes.remove(nodeId); - //notify impacted replicas they can reconnect to this node - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); - updateNodePartitions(nodeId, true); - } - - private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) { - ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties(); - Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId); - String nodeIdAddress = ""; - //in case the node joined with a new IP address, we need to send it to the other replicas - if (event == ClusterEventType.NODE_JOIN) { - nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY); - } - - ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); - ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext() - .getMessageBroker(); - for (String replica : remoteReplicas) { - //if the remote replica is alive, send the event - if (activeNcConfiguration.containsKey(replica)) { - try { - messageBroker.sendApplicationMessageToNC(msg, replica); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e); - } - } - } - } - - private synchronized void revertFailedFailbackPlanEffects() { - Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //TODO if the failing back node is still active, notify it to construct a new plan for it - iterator.remove(); - - //reassign the partitions that were supposed to be failed back to an active replica - requestPartitionsTakeover(plan.getNodeId()); - } - } - } - - private synchronized void notifyFailbackPlansNodeFailure(String nodeId) { - Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - plan.notifyNodeFailure(nodeId); - } - } - public synchronized boolean isMetadataNodeActive() { return metadataNodeActive; } @@ -676,4 +310,14 @@ public class ClusterStateManager { stateDescription.putPOJO("partitions", clusterPartitions); return stateDescription; } + + @Override + public Map<String, Map<String, String>> getActiveNcConfiguration() { + return Collections.unmodifiableMap(activeNcConfiguration); + } + + @Override + public String getCurrentMetadataNodeId() { + return currentMetadataNode; + } }
