http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java deleted file mode 100644 index 880a715..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ /dev/null @@ -1,569 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.container.states; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.HashSet; -import java.util.Set; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.NavigableSet; -import java.util.TreeSet; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .CONTAINER_EXISTS; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_CONTAINER_STATE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_FIND_CONTAINER; - -/** - * Container State Map acts like a unified map for various attributes that are - * used to select containers when we need allocated blocks. - * <p> - * This class provides the ability to query 5 classes of attributes. They are - * <p> - * 1. LifeCycleStates - LifeCycle States of container describe in which state - * a container is. For example, a container needs to be in Open State for a - * client to able to write to it. - * <p> - * 2. Owners - Each instance of Name service, for example, Namenode of HDFS or - * Ozone Manager (OM) of Ozone or CBlockServer -- is an owner. It is - * possible to have many OMs for a Ozone cluster and only one SCM. But SCM - * keeps the data from each OM in separate bucket, never mixing them. To - * write data, often we have to find all open containers for a specific owner. - * <p> - * 3. ReplicationType - The clients are allowed to specify what kind of - * replication pipeline they want to use. Each Container exists on top of a - * pipeline, so we need to get ReplicationType that is specified by the user. - * <p> - * 4. ReplicationFactor - The replication factor represents how many copies - * of data should be made, right now we support 2 different types, ONE - * Replica and THREE Replica. User can specify how many copies should be made - * for a ozone key. - * <p> - * The most common access pattern of this class is to select a container based - * on all these parameters, for example, when allocating a block we will - * select a container that belongs to user1, with Ratis replication which can - * make 3 copies of data. The fact that we will look for open containers by - * default and if we cannot find them we will add new containers. - */ -public class ContainerStateMap { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerStateMap.class); - - private final ContainerAttribute<LifeCycleState> lifeCycleStateMap; - private final ContainerAttribute<String> ownerMap; - private final ContainerAttribute<ReplicationFactor> factorMap; - private final ContainerAttribute<ReplicationType> typeMap; - - private final Map<ContainerID, ContainerInfo> containerMap; - // Map to hold replicas of given container. - private final Map<ContainerID, Set<DatanodeDetails>> contReplicaMap; - private final static NavigableSet<ContainerID> EMPTY_SET = - Collections.unmodifiableNavigableSet(new TreeSet<>()); - private final Map<ContainerQueryKey, NavigableSet<ContainerID>> resultCache; - - // Container State Map lock should be held before calling into - // Update ContainerAttributes. The consistency of ContainerAttributes is - // protected by this lock. - private final ReadWriteLock lock; - - /** - * Create a ContainerStateMap. - */ - public ContainerStateMap() { - lifeCycleStateMap = new ContainerAttribute<>(); - ownerMap = new ContainerAttribute<>(); - factorMap = new ContainerAttribute<>(); - typeMap = new ContainerAttribute<>(); - containerMap = new HashMap<>(); - lock = new ReentrantReadWriteLock(); - contReplicaMap = new HashMap<>(); -// new InstrumentedLock(getClass().getName(), LOG, -// new ReentrantLock(), -// 1000, -// 300)); - resultCache = new ConcurrentHashMap<>(); - } - - /** - * Adds a ContainerInfo Entry in the ContainerStateMap. - * - * @param info - container info - * @throws SCMException - throws if create failed. - */ - public void addContainer(ContainerInfo info) - throws SCMException { - Preconditions.checkNotNull(info, "Container Info cannot be null"); - Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0, - "ExpectedReplicaCount should be greater than 0"); - - lock.writeLock().lock(); - try { - ContainerID id = ContainerID.valueof(info.getContainerID()); - if (containerMap.putIfAbsent(id, info) != null) { - LOG.debug("Duplicate container ID detected. {}", id); - throw new - SCMException("Duplicate container ID detected.", - CONTAINER_EXISTS); - } - - lifeCycleStateMap.insert(info.getState(), id); - ownerMap.insert(info.getOwner(), id); - factorMap.insert(info.getReplicationFactor(), id); - typeMap.insert(info.getReplicationType(), id); - - // Flush the cache of this container type, will be added later when - // get container queries are executed. - flushCache(info); - LOG.trace("Created container with {} successfully.", id); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Returns the latest state of Container from SCM's Container State Map. - * - * @param info - ContainerInfo - * @return ContainerInfo - */ - public ContainerInfo getContainerInfo(ContainerInfo info) { - return getContainerInfo(info.getContainerID()); - } - - /** - * Returns the latest state of Container from SCM's Container State Map. - * - * @param containerID - int - * @return container info, if found. - */ - public ContainerInfo getContainerInfo(long containerID) { - return getContainerInfo(ContainerID.valueof(containerID)); - } - - /** - * Returns the latest state of Container from SCM's Container State Map. - * - * @param containerID - ContainerID - * @return container info, if found. - */ - public ContainerInfo getContainerInfo(ContainerID containerID) { - lock.readLock().lock(); - try { - return containerMap.get(containerID); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns the latest list of DataNodes where replica for given containerId - * exist. Throws an SCMException if no entry is found for given containerId. - * - * @param containerID - * @return Set<DatanodeDetails> - */ - public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID) - throws SCMException { - Preconditions.checkNotNull(containerID); - lock.readLock().lock(); - try { - if (contReplicaMap.containsKey(containerID)) { - return Collections - .unmodifiableSet(contReplicaMap.get(containerID)); - } - } finally { - lock.readLock().unlock(); - } - throw new SCMException( - "No entry exist for containerId: " + containerID + " in replica map.", - ResultCodes.NO_REPLICA_FOUND); - } - - /** - * Adds given datanodes as nodes where replica for given containerId exist. - * Logs a debug entry if a datanode is already added as replica for given - * ContainerId. - * - * @param containerID - * @param dnList - */ - public void addContainerReplica(ContainerID containerID, - DatanodeDetails... dnList) { - Preconditions.checkNotNull(containerID); - lock.writeLock().lock(); - try { - for (DatanodeDetails dn : dnList) { - Preconditions.checkNotNull(dn); - if (contReplicaMap.containsKey(containerID)) { - if(!contReplicaMap.get(containerID).add(dn)) { - LOG.debug("ReplicaMap already contains entry for container Id: " - + "{},DataNode: {}", containerID, dn); - } - } else { - Set<DatanodeDetails> dnSet = new HashSet<>(); - dnSet.add(dn); - contReplicaMap.put(containerID, dnSet); - } - } - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Remove a container Replica for given DataNode. - * - * @param containerID - * @param dn - * @return True of dataNode is removed successfully else false. - */ - public boolean removeContainerReplica(ContainerID containerID, - DatanodeDetails dn) throws SCMException { - Preconditions.checkNotNull(containerID); - Preconditions.checkNotNull(dn); - - lock.writeLock().lock(); - try { - if (contReplicaMap.containsKey(containerID)) { - return contReplicaMap.get(containerID).remove(dn); - } - } finally { - lock.writeLock().unlock(); - } - throw new SCMException( - "No entry exist for containerId: " + containerID + " in replica map.", - ResultCodes.FAILED_TO_FIND_CONTAINER); - } - - @VisibleForTesting - public static Logger getLOG() { - return LOG; - } - - /** - * Returns the full container Map. - * - * @return - Map - */ - public Map<ContainerID, ContainerInfo> getContainerMap() { - lock.readLock().lock(); - try { - return Collections.unmodifiableMap(containerMap); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Just update the container State. - * @param info ContainerInfo. - */ - public void updateContainerInfo(ContainerInfo info) throws SCMException { - Preconditions.checkNotNull(info); - ContainerInfo currentInfo = null; - lock.writeLock().lock(); - try { - currentInfo = containerMap.get( - ContainerID.valueof(info.getContainerID())); - - if (currentInfo == null) { - throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER); - } - flushCache(info, currentInfo); - containerMap.put(info.containerID(), info); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Update the State of a container. - * - * @param info - ContainerInfo - * @param currentState - CurrentState - * @param newState - NewState. - * @throws SCMException - in case of failure. - */ - public void updateState(ContainerInfo info, LifeCycleState currentState, - LifeCycleState newState) throws SCMException { - Preconditions.checkNotNull(currentState); - Preconditions.checkNotNull(newState); - - ContainerID id = new ContainerID(info.getContainerID()); - ContainerInfo currentInfo = null; - - lock.writeLock().lock(); - try { - try { - // Just flush both old and new data sets from the result cache. - ContainerInfo newInfo = new ContainerInfo(info); - newInfo.setState(newState); - flushCache(newInfo, info); - - currentInfo = containerMap.get(id); - - if (currentInfo == null) { - throw new - SCMException("No such container.", FAILED_TO_FIND_CONTAINER); - } - // We are updating two places before this update is done, these can - // fail independently, since the code needs to handle it. - - // We update the attribute map, if that fails it will throw an - // exception, so no issues, if we are successful, we keep track of the - // fact that we have updated the lifecycle state in the map, and update - // the container state. If this second update fails, we will attempt to - // roll back the earlier change we did. If the rollback fails, we can - // be in an inconsistent state, - - info.setState(newState); - containerMap.put(id, info); - lifeCycleStateMap.update(currentState, newState, id); - LOG.trace("Updated the container {} to new state. Old = {}, new = " + - "{}", id, currentState, newState); - } catch (SCMException ex) { - LOG.error("Unable to update the container state. {}", ex); - // we need to revert the change in this attribute since we are not - // able to update the hash table. - LOG.info("Reverting the update to lifecycle state. Moving back to " + - "old state. Old = {}, Attempted state = {}", currentState, - newState); - - containerMap.put(id, currentInfo); - - // if this line throws, the state map can be in an inconsistent - // state, since we will have modified the attribute by the - // container state will not in sync since we were not able to put - // that into the hash table. - lifeCycleStateMap.update(newState, currentState, id); - - throw new SCMException("Updating the container map failed.", ex, - FAILED_TO_CHANGE_CONTAINER_STATE); - } - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Returns A list of containers owned by a name service. - * - * @param ownerName - Name of the NameService. - * @return - NavigableSet of ContainerIDs. - */ - NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) { - Preconditions.checkNotNull(ownerName); - lock.readLock().lock(); - try { - return ownerMap.getCollection(ownerName); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns Containers in the System by the Type. - * - * @param type - Replication type -- StandAlone, Ratis etc. - * @return NavigableSet - */ - NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) { - Preconditions.checkNotNull(type); - lock.readLock().lock(); - try { - return typeMap.getCollection(type); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns Containers by replication factor. - * - * @param factor - Replication Factor. - * @return NavigableSet. - */ - NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) { - Preconditions.checkNotNull(factor); - lock.readLock().lock(); - try { - return factorMap.getCollection(factor); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns Containers by State. - * - * @param state - State - Open, Closed etc. - * @return List of containers by state. - */ - public NavigableSet<ContainerID> getContainerIDsByState( - LifeCycleState state) { - Preconditions.checkNotNull(state); - lock.readLock().lock(); - try { - return lifeCycleStateMap.getCollection(state); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Gets the containers that matches the following filters. - * - * @param state - LifeCycleState - * @param owner - Owner - * @param factor - Replication Factor - * @param type - Replication Type - * @return ContainerInfo or Null if not container satisfies the criteria. - */ - public NavigableSet<ContainerID> getMatchingContainerIDs( - LifeCycleState state, String owner, - ReplicationFactor factor, ReplicationType type) { - - Preconditions.checkNotNull(state, "State cannot be null"); - Preconditions.checkNotNull(owner, "Owner cannot be null"); - Preconditions.checkNotNull(factor, "Factor cannot be null"); - Preconditions.checkNotNull(type, "Type cannot be null"); - - lock.readLock().lock(); - try { - ContainerQueryKey queryKey = - new ContainerQueryKey(state, owner, factor, type); - if(resultCache.containsKey(queryKey)){ - return resultCache.get(queryKey); - } - - // If we cannot meet any one condition we return EMPTY_SET immediately. - // Since when we intersect these sets, the result will be empty if any - // one is empty. - NavigableSet<ContainerID> stateSet = - lifeCycleStateMap.getCollection(state); - if (stateSet.size() == 0) { - return EMPTY_SET; - } - - NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner); - if (ownerSet.size() == 0) { - return EMPTY_SET; - } - - NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor); - if (factorSet.size() == 0) { - return EMPTY_SET; - } - - NavigableSet<ContainerID> typeSet = typeMap.getCollection(type); - if (typeSet.size() == 0) { - return EMPTY_SET; - } - - - // if we add more constraints we will just add those sets here.. - NavigableSet<ContainerID>[] sets = sortBySize(stateSet, - ownerSet, factorSet, typeSet); - - NavigableSet<ContainerID> currentSet = sets[0]; - // We take the smallest set and intersect against the larger sets. This - // allows us to reduce the lookups to the least possible number. - for (int x = 1; x < sets.length; x++) { - currentSet = intersectSets(currentSet, sets[x]); - } - resultCache.put(queryKey, currentSet); - return currentSet; - } finally { - lock.readLock().unlock(); - } - } - - /** - * Calculates the intersection between sets and returns a new set. - * - * @param smaller - First Set - * @param bigger - Second Set - * @return resultSet which is the intersection of these two sets. - */ - private NavigableSet<ContainerID> intersectSets( - NavigableSet<ContainerID> smaller, - NavigableSet<ContainerID> bigger) { - Preconditions.checkState(smaller.size() <= bigger.size(), - "This function assumes the first set is lesser or equal to second " + - "set"); - NavigableSet<ContainerID> resultSet = new TreeSet<>(); - for (ContainerID id : smaller) { - if (bigger.contains(id)) { - resultSet.add(id); - } - } - return resultSet; - } - - /** - * Sorts a list of Sets based on Size. This is useful when we are - * intersecting the sets. - * - * @param sets - varagrs of sets - * @return Returns a sorted array of sets based on the size of the set. - */ - @SuppressWarnings("unchecked") - private NavigableSet<ContainerID>[] sortBySize( - NavigableSet<ContainerID>... sets) { - for (int x = 0; x < sets.length - 1; x++) { - for (int y = 0; y < sets.length - x - 1; y++) { - if (sets[y].size() > sets[y + 1].size()) { - NavigableSet temp = sets[y]; - sets[y] = sets[y + 1]; - sets[y + 1] = temp; - } - } - } - return sets; - } - - private void flushCache(ContainerInfo... containerInfos) { - for (ContainerInfo containerInfo : containerInfos) { - ContainerQueryKey key = new ContainerQueryKey(containerInfo.getState(), - containerInfo.getOwner(), containerInfo.getReplicationFactor(), - containerInfo.getReplicationType()); - resultCache.remove(key); - } - } - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java deleted file mode 100644 index 8ad1c8b..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -/** - * Container States package. - */ -package org.apache.hadoop.hdds.scm.container.states; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java deleted file mode 100644 index 77b8713..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hadoop.hdds.scm.events; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CloseContainerStatus; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .ReplicationStatus; -import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler - .CloseContainerRetryableReq; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .PipelineReportFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .PipelineActionsFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ContainerActionsFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .CommandStatusReportFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ContainerReportFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .NodeReportFromDatanode; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager - .ReplicationCompleted; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; - -import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport; -import org.apache.hadoop.hdds.server.events.Event; -import org.apache.hadoop.hdds.server.events.TypedEvent; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; - -/** - * Class that acts as the namespace for all SCM Events. - */ -public final class SCMEvents { - - /** - * NodeReports are sent out by Datanodes. This report is received by - * SCMDatanodeHeartbeatDispatcher and NodeReport Event is generated. - */ - public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT = - new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report"); - - /** - * Event generated on DataNode registration. - */ - public static final TypedEvent<NodeRegistrationContainerReport> - NODE_REGISTRATION_CONT_REPORT = new TypedEvent<>( - NodeRegistrationContainerReport.class, - "Node_Registration_Container_Report"); - - /** - * ContainerReports are send out by Datanodes. This report is received by - * SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated. - */ - public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT = - new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); - - /** - * ContainerActions are sent by Datanode. This event is received by - * SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated. - */ - public static final TypedEvent<ContainerActionsFromDatanode> - CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class, - "Container_Actions"); - - /** - * PipelineReports are send out by Datanodes. This report is received by - * SCMDatanodeHeartbeatDispatcher and Pipeline_Report Event is generated. - */ - public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT = - new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report"); - - /** - * PipelineActions are sent by Datanode. This event is received by - * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated. - */ - public static final TypedEvent<PipelineActionsFromDatanode> - PIPELINE_ACTIONS = new TypedEvent<>(PipelineActionsFromDatanode.class, - "Pipeline_Actions"); - - /** - * Pipeline close event are triggered to close pipeline because of failure, - * stale node, decommissioning etc. - */ - public static final TypedEvent<PipelineID> - PIPELINE_CLOSE = new TypedEvent<>(PipelineID.class, - "Pipeline_Close"); - - /** - * A Command status report will be sent by datanodes. This repoort is received - * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. - */ - public static final TypedEvent<CommandStatusReportFromDatanode> - CMD_STATUS_REPORT = - new TypedEvent<>(CommandStatusReportFromDatanode.class, - "Cmd_Status_Report"); - - /** - * When ever a command for the Datanode needs to be issued by any component - * inside SCM, a Datanode_Command event is generated. NodeManager listens to - * these events and dispatches them to Datanode for further processing. - */ - public static final Event<CommandForDatanode> DATANODE_COMMAND = - new TypedEvent<>(CommandForDatanode.class, "Datanode_Command"); - - public static final TypedEvent<CommandForDatanode> - RETRIABLE_DATANODE_COMMAND = - new TypedEvent<>(CommandForDatanode.class, "Retriable_Datanode_Command"); - - /** - * A Close Container Event can be triggered under many condition. Some of them - * are: 1. A Container is full, then we stop writing further information to - * that container. DN's let SCM know that current state and sends a - * informational message that allows SCM to close the container. - * <p> - * 2. If a pipeline is open; for example Ratis; if a single node fails, we - * will proactively close these containers. - * <p> - * Once a command is dispatched to DN, we will also listen to updates from the - * datanode which lets us know that this command completed or timed out. - */ - public static final TypedEvent<ContainerID> CLOSE_CONTAINER = - new TypedEvent<>(ContainerID.class, "Close_Container"); - - /** - * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by - * CloseContainerEventHandler after sending a SCMCommand to DataNode. - * CloseContainerWatcher will track this event. Watcher will be responsible - * for retrying it in event of failure or timeout. - */ - public static final TypedEvent<CloseContainerRetryableReq> - CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>( - CloseContainerRetryableReq.class, "Close_Container_Retryable"); - - /** - * This event will be triggered whenever a new datanode is registered with - * SCM. - */ - public static final TypedEvent<DatanodeDetails> NEW_NODE = - new TypedEvent<>(DatanodeDetails.class, "New_Node"); - - /** - * This event will be triggered whenever a datanode is moved from healthy to - * stale state. - */ - public static final TypedEvent<DatanodeDetails> STALE_NODE = - new TypedEvent<>(DatanodeDetails.class, "Stale_Node"); - - /** - * This event will be triggered whenever a datanode is moved from stale to - * dead state. - */ - public static final TypedEvent<DatanodeDetails> DEAD_NODE = - new TypedEvent<>(DatanodeDetails.class, "Dead_Node"); - - /** - * This event will be triggered by CommandStatusReportHandler whenever a - * status for Replication SCMCommand is received. - */ - public static final Event<ReplicationStatus> REPLICATION_STATUS = new - TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status"); - /** - * This event will be triggered by CommandStatusReportHandler whenever a - * status for CloseContainer SCMCommand is received. - */ - public static final Event<CloseContainerStatus> - CLOSE_CONTAINER_STATUS = - new TypedEvent<>(CloseContainerStatus.class, - "Close_Container_Command_Status"); - /** - * This event will be triggered by CommandStatusReportHandler whenever a - * status for DeleteBlock SCMCommand is received. - */ - public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus> - DELETE_BLOCK_STATUS = - new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, - "Delete_Block_Status"); - - /** - * This event will be triggered while processing container reports from DN - * when deleteTransactionID of container in report mismatches with the - * deleteTransactionID on SCM. - */ - public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS = - new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status"); - - /** - * This is the command for ReplicationManager to handle under/over - * replication. Sent by the ContainerReportHandler after processing the - * heartbeat. - */ - public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER = - new TypedEvent<>(ReplicationRequest.class); - - /** - * This event is sent by the ReplicaManager to the - * ReplicationCommandWatcher to track the in-progress replication. - */ - public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat> - TRACK_REPLICATE_COMMAND = - new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class); - /** - * This event comes from the Heartbeat dispatcher (in fact from the - * datanode) to notify the scm that the replication is done. This is - * received by the replicate command watcher to mark in-progress task as - * finished. - <p> - * TODO: Temporary event, should be replaced by specific Heartbeat - * ActionRequred event. - */ - public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE = - new TypedEvent<>(ReplicationCompleted.class); - - /** - * Signal for all the components (but especially for the replication - * manager and container report handler) that the replication could be - * started. Should be send only if (almost) all the container state are - * available from the datanodes. - */ - public static final TypedEvent<Boolean> START_REPLICATION = - new TypedEvent<>(Boolean.class); - public static final TypedEvent<Boolean> CHILL_MODE_STATUS = - new TypedEvent<>(Boolean.class); - - /** - * Private Ctor. Never Constructed. - */ - private SCMEvents() { - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/package-info.java deleted file mode 100644 index 46181a3..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * Events Package contains all the Events used by SCM internally to - * communicate between different sub-systems that make up SCM. - */ -package org.apache.hadoop.hdds.scm.events; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java deleted file mode 100644 index dae0b06..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.exceptions; - -import java.io.IOException; - -/** - * Exception thrown by SCM. - */ -public class SCMException extends IOException { - private final ResultCodes result; - - /** - * Constructs an {@code IOException} with {@code null} - * as its error detail message. - */ - public SCMException(ResultCodes result) { - this.result = result; - } - - /** - * Constructs an {@code IOException} with the specified detail message. - * - * @param message The detail message (which is saved for later retrieval by - * the - * {@link #getMessage()} method) - */ - public SCMException(String message, ResultCodes result) { - super(message); - this.result = result; - } - - /** - * Constructs an {@code IOException} with the specified detail message - * and cause. - * <p> - * <p> Note that the detail message associated with {@code cause} is - * <i>not</i> automatically incorporated into this exception's detail - * message. - * - * @param message The detail message (which is saved for later retrieval by - * the - * {@link #getMessage()} method) - * @param cause The cause (which is saved for later retrieval by the {@link - * #getCause()} method). (A null value is permitted, and indicates that the - * cause is nonexistent or unknown.) - * @since 1.6 - */ - public SCMException(String message, Throwable cause, ResultCodes result) { - super(message, cause); - this.result = result; - } - - /** - * Constructs an {@code IOException} with the specified cause and a - * detail message of {@code (cause==null ? null : cause.toString())} - * (which typically contains the class and detail message of {@code cause}). - * This constructor is useful for IO exceptions that are little more - * than wrappers for other throwables. - * - * @param cause The cause (which is saved for later retrieval by the {@link - * #getCause()} method). (A null value is permitted, and indicates that the - * cause is nonexistent or unknown.) - * @since 1.6 - */ - public SCMException(Throwable cause, ResultCodes result) { - super(cause); - this.result = result; - } - - /** - * Returns resultCode. - * @return ResultCode - */ - public ResultCodes getResult() { - return result; - } - - /** - * Error codes to make it easy to decode these exceptions. - */ - public enum ResultCodes { - SUCCEESS, - FAILED_TO_LOAD_NODEPOOL, - FAILED_TO_FIND_NODE_IN_POOL, - FAILED_TO_FIND_HEALTHY_NODES, - FAILED_TO_FIND_NODES_WITH_SPACE, - FAILED_TO_FIND_SUITABLE_NODE, - INVALID_CAPACITY, - INVALID_BLOCK_SIZE, - CHILL_MODE_EXCEPTION, - FAILED_TO_LOAD_OPEN_CONTAINER, - FAILED_TO_ALLOCATE_CONTAINER, - FAILED_TO_CHANGE_CONTAINER_STATE, - FAILED_TO_CHANGE_PIPELINE_STATE, - CONTAINER_EXISTS, - FAILED_TO_FIND_CONTAINER, - FAILED_TO_FIND_CONTAINER_WITH_SPACE, - BLOCK_EXISTS, - FAILED_TO_FIND_BLOCK, - IO_EXCEPTION, - UNEXPECTED_CONTAINER_STATE, - SCM_NOT_INITIALIZED, - DUPLICATE_DATANODE, - NO_SUCH_DATANODE, - NO_REPLICA_FOUND, - FAILED_TO_FIND_ACTIVE_PIPELINE - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java deleted file mode 100644 index 7b69310..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.exceptions; -// Exceptions thrown by SCM. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java deleted file mode 100644 index 996478c..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.node; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Command Queue is queue of commands for the datanode. - * <p> - * Node manager, container Manager and Ozone managers can queue commands for - * datanodes into this queue. These commands will be send in the order in which - * there where queued. - */ -public class CommandQueue { - // This list is used as default return value. - private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>(); - private final Map<UUID, Commands> commandMap; - private final Lock lock; - private long commandsInQueue; - - /** - * Returns number of commands in queue. - * @return Command Count. - */ - public long getCommandsInQueue() { - return commandsInQueue; - } - - /** - * Constructs a Command Queue. - * TODO : Add a flusher thread that throws away commands older than a certain - * time period. - */ - public CommandQueue() { - commandMap = new HashMap<>(); - lock = new ReentrantLock(); - commandsInQueue = 0; - } - - /** - * This function is used only for test purposes. - */ - @VisibleForTesting - public void clear() { - lock.lock(); - try { - commandMap.clear(); - commandsInQueue = 0; - } finally { - lock.unlock(); - } - } - - /** - * Returns a list of Commands for the datanode to execute, if we have no - * commands returns a empty list otherwise the current set of - * commands are returned and command map set to empty list again. - * - * @param datanodeUuid Datanode UUID - * @return List of SCM Commands. - */ - @SuppressWarnings("unchecked") - List<SCMCommand> getCommand(final UUID datanodeUuid) { - lock.lock(); - try { - Commands cmds = commandMap.remove(datanodeUuid); - List<SCMCommand> cmdList = null; - if(cmds != null) { - cmdList = cmds.getCommands(); - commandsInQueue -= cmdList.size() > 0 ? cmdList.size() : 0; - // A post condition really. - Preconditions.checkState(commandsInQueue >= 0); - } - return cmds == null ? DEFAULT_LIST : cmdList; - } finally { - lock.unlock(); - } - } - - /** - * Adds a Command to the SCM Queue to send the command to container. - * - * @param datanodeUuid DatanodeDetails.Uuid - * @param command - Command - */ - public void addCommand(final UUID datanodeUuid, final SCMCommand - command) { - lock.lock(); - try { - if (commandMap.containsKey(datanodeUuid)) { - commandMap.get(datanodeUuid).add(command); - } else { - commandMap.put(datanodeUuid, new Commands(command)); - } - commandsInQueue++; - } finally { - lock.unlock(); - } - } - - /** - * Class that stores commands for a datanode. - */ - private static class Commands { - private long updateTime; - private long readTime; - private List<SCMCommand> commands; - - /** - * Constructs a Commands class. - */ - Commands() { - commands = new LinkedList<>(); - updateTime = 0; - readTime = 0; - } - - /** - * Creates the object and populates with the command. - * @param command command to add to queue. - */ - Commands(SCMCommand command) { - this(); - this.add(command); - } - - /** - * Gets the last time the commands for this node was updated. - * @return Time stamp - */ - public long getUpdateTime() { - return updateTime; - } - - /** - * Gets the last read time. - * @return last time when these commands were read from this queue. - */ - public long getReadTime() { - return readTime; - } - - /** - * Adds a command to the list. - * - * @param command SCMCommand - */ - public void add(SCMCommand command) { - this.commands.add(command); - updateTime = Time.monotonicNow(); - } - - /** - * Returns the commands for this datanode. - * @return command list. - */ - public List<SCMCommand> getCommands() { - List<SCMCommand> temp = this.commands; - this.commands = new LinkedList<>(); - readTime = Time.monotonicNow(); - return temp; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java deleted file mode 100644 index 26b8b95..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.StorageReportProto; -import org.apache.hadoop.util.Time; - -import java.util.List; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * This class extends the primary identifier of a Datanode with ephemeral - * state, eg last reported time, usage information etc. - */ -public class DatanodeInfo extends DatanodeDetails { - - private final ReadWriteLock lock; - - private volatile long lastHeartbeatTime; - private long lastStatsUpdatedTime; - - // If required we can dissect StorageReportProto and store the raw data - private List<StorageReportProto> storageReports; - - /** - * Constructs DatanodeInfo from DatanodeDetails. - * - * @param datanodeDetails Details about the datanode - */ - public DatanodeInfo(DatanodeDetails datanodeDetails) { - super(datanodeDetails); - lock = new ReentrantReadWriteLock(); - lastHeartbeatTime = Time.monotonicNow(); - } - - /** - * Updates the last heartbeat time with current time. - */ - public void updateLastHeartbeatTime() { - try { - lock.writeLock().lock(); - lastHeartbeatTime = Time.monotonicNow(); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Returns the last heartbeat time. - * - * @return last heartbeat time. - */ - public long getLastHeartbeatTime() { - try { - lock.readLock().lock(); - return lastHeartbeatTime; - } finally { - lock.readLock().unlock(); - } - } - - /** - * Updates the datanode storage reports. - * - * @param reports list of storage report - */ - public void updateStorageReports(List<StorageReportProto> reports) { - try { - lock.writeLock().lock(); - lastStatsUpdatedTime = Time.monotonicNow(); - storageReports = reports; - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Returns the storage reports associated with this datanode. - * - * @return list of storage report - */ - public List<StorageReportProto> getStorageReports() { - try { - lock.readLock().lock(); - return storageReports; - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns the last updated time of datanode info. - * @return the last updated time of datanode info. - */ - public long getLastStatsUpdatedTime() { - return lastStatsUpdatedTime; - } - - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return super.equals(obj); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java deleted file mode 100644 index 17edf9e..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import java.util.Set; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerStateManager; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles Dead Node event. - */ -public class DeadNodeHandler implements EventHandler<DatanodeDetails> { - - private final ContainerStateManager containerStateManager; - - private final NodeManager nodeManager; - - private static final Logger LOG = - LoggerFactory.getLogger(DeadNodeHandler.class); - - public DeadNodeHandler(NodeManager nodeManager, - ContainerStateManager containerStateManager) { - this.containerStateManager = containerStateManager; - this.nodeManager = nodeManager; - } - - @Override - public void onMessage(DatanodeDetails datanodeDetails, - EventPublisher publisher) { - nodeManager.processDeadNode(datanodeDetails.getUuid()); - - Set<ContainerID> containers = - nodeManager.getContainers(datanodeDetails.getUuid()); - if (containers == null) { - LOG.info("There's no containers in dead datanode {}, no replica will be" - + " removed from the in-memory state.", datanodeDetails.getUuid()); - return; - } - LOG.info( - "Datanode {} is dead. Removing replications from the in-memory state.", - datanodeDetails.getUuid()); - for (ContainerID container : containers) { - try { - try { - containerStateManager.removeContainerReplica(container, - datanodeDetails); - } catch (SCMException ex) { - LOG.info("DataNode {} doesn't have replica for container {}.", - datanodeDetails.getUuid(), container.getId()); - } - - if (!containerStateManager.isOpen(container)) { - ReplicationRequest replicationRequest = - containerStateManager.checkReplicationState(container); - - if (replicationRequest != null) { - publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - replicationRequest); - } - } - } catch (SCMException e) { - LOG.error("Can't remove container from containerStateMap {}", container - .getId(), e); - } - } - } - - /** - * Returns logger. - * */ - public static Logger getLogger() { - return LOG; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java deleted file mode 100644 index 780aa2b..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -import java.util.Collections; - -/** - * Handles New Node event. - */ -public class NewNodeHandler implements EventHandler<DatanodeDetails> { - - private final NodeManager nodeManager; - - public NewNodeHandler(NodeManager nodeManager) { - this.nodeManager = nodeManager; - } - - @Override - public void onMessage(DatanodeDetails datanodeDetails, - EventPublisher publisher) { - try { - nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(), - Collections.emptySet()); - } catch (SCMException e) { - // TODO: log exception message. - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java deleted file mode 100644 index 0dc1a0c..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.scm.node.states.ReportResult; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -import java.io.Closeable; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -/** - * A node manager supports a simple interface for managing a datanode. - * <p/> - * 1. A datanode registers with the NodeManager. - * <p/> - * 2. If the node is allowed to register, we add that to the nodes that we need - * to keep track of. - * <p/> - * 3. A heartbeat is made by the node at a fixed frequency. - * <p/> - * 4. A node can be in any of these 4 states: {HEALTHY, STALE, DEAD, - * DECOMMISSIONED} - * <p/> - * HEALTHY - It is a datanode that is regularly heartbeating us. - * - * STALE - A datanode for which we have missed few heart beats. - * - * DEAD - A datanode that we have not heard from for a while. - * - * DECOMMISSIONED - Someone told us to remove this node from the tracking - * list, by calling removeNode. We will throw away this nodes info soon. - */ -public interface NodeManager extends StorageContainerNodeProtocol, - EventHandler<CommandForDatanode>, NodeManagerMXBean, Closeable { - /** - * Removes a data node from the management of this Node Manager. - * - * @param node - DataNode. - * @throws NodeNotFoundException - */ - void removeNode(DatanodeDetails node) throws NodeNotFoundException; - - /** - * Gets all Live Datanodes that is currently communicating with SCM. - * @param nodeState - State of the node - * @return List of Datanodes that are Heartbeating SCM. - */ - List<DatanodeDetails> getNodes(NodeState nodeState); - - /** - * Returns the Number of Datanodes that are communicating with SCM. - * @param nodeState - State of the node - * @return int -- count - */ - int getNodeCount(NodeState nodeState); - - /** - * Get all datanodes known to SCM. - * - * @return List of DatanodeDetails known to SCM. - */ - List<DatanodeDetails> getAllNodes(); - - /** - * Chill mode is the period when node manager waits for a minimum - * configured number of datanodes to report in. This is called chill mode - * to indicate the period before node manager gets into action. - * - * Forcefully exits the chill mode, even if we have not met the minimum - * criteria of the nodes reporting in. - */ - void forceExitChillMode(); - - /** - * Puts the node manager into manual chill mode. - */ - void enterChillMode(); - - /** - * Brings node manager out of manual chill mode. - */ - void exitChillMode(); - - /** - * Returns the aggregated node stats. - * @return the aggregated node stats. - */ - SCMNodeStat getStats(); - - /** - * Return a map of node stats. - * @return a map of individual node stats (live/stale but not dead). - */ - Map<UUID, SCMNodeStat> getNodeStats(); - - /** - * Return the node stat of the specified datanode. - * @param datanodeDetails DatanodeDetails. - * @return node stat if it is live/stale, null if it is decommissioned or - * doesn't exist. - */ - SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails); - - /** - * Returns the node state of a specific node. - * @param datanodeDetails DatanodeDetails - * @return Healthy/Stale/Dead. - */ - NodeState getNodeState(DatanodeDetails datanodeDetails); - - /** - * Get set of pipelines a datanode is part of. - * @param dnId - datanodeID - * @return Set of PipelineID - */ - Set<PipelineID> getPipelineByDnID(UUID dnId); - - /** - * Add pipeline information in the NodeManager. - * @param pipeline - Pipeline to be added - */ - void addPipeline(Pipeline pipeline); - - /** - * Remove a pipeline information from the NodeManager. - * @param pipeline - Pipeline to be removed - */ - void removePipeline(Pipeline pipeline); - - /** - * Update set of containers available on a datanode. - * @param uuid - DatanodeID - * @param containerIds - Set of containerIDs - * @throws SCMException - if datanode is not known. For new datanode use - * addDatanodeInContainerMap call. - */ - void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) - throws SCMException; - - /** - * Process containerReport received from datanode. - * @param uuid - DataonodeID - * @param containerIds - Set of containerIDs - * @return The result after processing containerReport - */ - ReportResult<ContainerID> processContainerReport(UUID uuid, - Set<ContainerID> containerIds); - - /** - * Return set of containerIDs available on a datanode. - * @param uuid - DatanodeID - * @return - set of containerIDs - */ - Set<ContainerID> getContainers(UUID uuid); - - /** - * Insert a new datanode with set of containerIDs for containers available - * on it. - * @param uuid - DatanodeID - * @param containerIDs - Set of ContainerIDs - * @throws SCMException - if datanode already exists - */ - void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs) - throws SCMException; - - /** - * Add a {@link SCMCommand} to the command queue, which are - * handled by HB thread asynchronously. - * @param dnId datanode uuid - * @param command - */ - void addDatanodeCommand(UUID dnId, SCMCommand command); - - /** - * Process node report. - * - * @param dnUuid - * @param nodeReport - */ - void processNodeReport(UUID dnUuid, NodeReportProto nodeReport); - - /** - * Process a dead node event in this Node Manager. - * - * @param dnUuid datanode uuid. - */ - void processDeadNode(UUID dnUuid); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java deleted file mode 100644 index 3ac993b..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManagerMXBean.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.util.Map; - -/** - * - * This is the JMX management interface for node manager information. - */ -@InterfaceAudience.Private -public interface NodeManagerMXBean { - /** - * Get the minimum number of nodes to get out of chill mode. - * - * @return int - */ - int getMinimumChillModeNodes(); - - /** - * Returns a chill mode status string. - * @return String - */ - String getChillModeStatus(); - - - /** - * Returns true if node manager is out of chill mode, else false. - * @return true if out of chill mode, else false - */ - boolean isOutOfChillMode(); - - /** - * Get the number of data nodes that in all states. - * - * @return A state to number of nodes that in this state mapping - */ - Map<String, Integer> getNodeCount(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java deleted file mode 100644 index 331bfed..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .NodeReportFromDatanode; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles Node Reports from datanode. - */ -public class NodeReportHandler implements EventHandler<NodeReportFromDatanode> { - - private static final Logger LOGGER = LoggerFactory - .getLogger(NodeReportHandler.class); - private final NodeManager nodeManager; - - public NodeReportHandler(NodeManager nodeManager) { - Preconditions.checkNotNull(nodeManager); - this.nodeManager = nodeManager; - } - - @Override - public void onMessage(NodeReportFromDatanode nodeReportFromDatanode, - EventPublisher publisher) { - Preconditions.checkNotNull(nodeReportFromDatanode); - DatanodeDetails dn = nodeReportFromDatanode.getDatanodeDetails(); - Preconditions.checkNotNull(dn, "NodeReport is " - + "missing DatanodeDetails."); - LOGGER.trace("Processing node report for dn: {}", dn); - nodeManager - .processNodeReport(dn.getUuid(), nodeReportFromDatanode.getReport()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org