http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java deleted file mode 100644 index 71e17e9..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ /dev/null @@ -1,699 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo; -import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.lease.Lease; -import org.apache.hadoop.ozone.lease.LeaseException; -import org.apache.hadoop.ozone.lease.LeaseManager; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_CONTAINER_STATE; -import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; -import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; - -/** - * Mapping class contains the mapping from a name to a pipeline mapping. This - * is used by SCM when - * allocating new locations and when looking up a key. - */ -public class ContainerMapping implements Mapping { - private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping - .class); - - private final NodeManager nodeManager; - private final long cacheSize; - private final Lock lock; - private final Charset encoding = Charset.forName("UTF-8"); - private final MetadataStore containerStore; - private final PipelineSelector pipelineSelector; - private final ContainerStateManager containerStateManager; - private final LeaseManager<ContainerInfo> containerLeaseManager; - private final EventPublisher eventPublisher; - private final long size; - - /** - * Constructs a mapping class that creates mapping between container names - * and pipelines. - * - * @param nodeManager - NodeManager so that we can get the nodes that are - * healthy to place new - * containers. - * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache - * its nodes. This is - * passed to LevelDB and this memory is allocated in Native code space. - * CacheSize is specified - * in MB. - * @throws IOException on Failure. - */ - @SuppressWarnings("unchecked") - public ContainerMapping( - final Configuration conf, final NodeManager nodeManager, final int - cacheSizeMB, EventPublisher eventPublisher) throws IOException { - this.nodeManager = nodeManager; - this.cacheSize = cacheSizeMB; - - File metaDir = getOzoneMetaDirPath(conf); - - // Write the container name to pipeline mapping. - File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); - containerStore = - MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(containerDBPath) - .setCacheSize(this.cacheSize * OzoneConsts.MB) - .build(); - - this.lock = new ReentrantLock(); - - size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, - OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - - this.pipelineSelector = new PipelineSelector(nodeManager, - conf, eventPublisher, cacheSizeMB); - - this.containerStateManager = - new ContainerStateManager(conf, this, pipelineSelector); - LOG.trace("Container State Manager created."); - - this.eventPublisher = eventPublisher; - - long containerCreationLeaseTimeout = conf.getTimeDuration( - ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, - ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - containerLeaseManager = new LeaseManager<>("ContainerCreation", - containerCreationLeaseTimeout); - containerLeaseManager.start(); - } - - /** - * {@inheritDoc} - */ - @Override - public ContainerInfo getContainer(final long containerID) throws - IOException { - ContainerInfo containerInfo; - lock.lock(); - try { - byte[] containerBytes = containerStore.get( - Longs.toByteArray(containerID)); - if (containerBytes == null) { - throw new SCMException( - "Specified key does not exist. key : " + containerID, - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - - HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes); - containerInfo = ContainerInfo.fromProtobuf(temp); - return containerInfo; - } finally { - lock.unlock(); - } - } - - /** - * Returns the ContainerInfo and pipeline from the containerID. If container - * has no available replicas in datanodes it returns pipeline with no - * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for - * an empty pipeline. - * - * @param containerID - ID of container. - * @return - ContainerWithPipeline such as creation state and the pipeline. - * @throws IOException - */ - @Override - public ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException { - ContainerInfo contInfo; - lock.lock(); - try { - byte[] containerBytes = containerStore.get( - Longs.toByteArray(containerID)); - if (containerBytes == null) { - throw new SCMException( - "Specified key does not exist. key : " + containerID, - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes); - contInfo = ContainerInfo.fromProtobuf(temp); - - Pipeline pipeline; - String leaderId = ""; - if (contInfo.isContainerOpen()) { - // If pipeline with given pipeline Id already exist return it - pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); - } else { - // For close containers create pipeline from datanodes with replicas - Set<DatanodeDetails> dnWithReplicas = containerStateManager - .getContainerReplicas(contInfo.containerID()); - if (!dnWithReplicas.isEmpty()) { - leaderId = dnWithReplicas.iterator().next().getUuidString(); - } - pipeline = new Pipeline(leaderId, contInfo.getState(), - ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), - PipelineID.randomId()); - dnWithReplicas.forEach(pipeline::addMember); - } - return new ContainerWithPipeline(contInfo, pipeline); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<ContainerInfo> listContainer(long startContainerID, - int count) throws IOException { - List<ContainerInfo> containerList = new ArrayList<>(); - lock.lock(); - try { - if (containerStore.isEmpty()) { - throw new IOException("No container exists in current db"); - } - byte[] startKey = startContainerID <= 0 ? null : - Longs.toByteArray(startContainerID); - List<Map.Entry<byte[], byte[]>> range = - containerStore.getSequentialRangeKVs(startKey, count, null); - - // Transform the values into the pipelines. - // TODO: filter by container state - for (Map.Entry<byte[], byte[]> entry : range) { - ContainerInfo containerInfo = - ContainerInfo.fromProtobuf( - HddsProtos.SCMContainerInfo.PARSER.parseFrom( - entry.getValue())); - Preconditions.checkNotNull(containerInfo); - containerList.add(containerInfo); - } - } finally { - lock.unlock(); - } - return containerList; - } - - /** - * Allocates a new container. - * - * @param replicationFactor - replication factor of the container. - * @param owner - The string name of the Service that owns this container. - * @return - Pipeline that makes up this container. - * @throws IOException - Exception - */ - @Override - public ContainerWithPipeline allocateContainer( - ReplicationType type, - ReplicationFactor replicationFactor, - String owner) - throws IOException { - - ContainerInfo containerInfo; - ContainerWithPipeline containerWithPipeline; - - lock.lock(); - try { - containerWithPipeline = containerStateManager.allocateContainer( - pipelineSelector, type, replicationFactor, owner); - containerInfo = containerWithPipeline.getContainerInfo(); - - byte[] containerIDBytes = Longs.toByteArray( - containerInfo.getContainerID()); - containerStore.put(containerIDBytes, containerInfo.getProtobuf() - .toByteArray()); - } finally { - lock.unlock(); - } - return containerWithPipeline; - } - - /** - * Deletes a container from SCM. - * - * @param containerID - Container ID - * @throws IOException if container doesn't exist or container store failed - * to delete the - * specified key. - */ - @Override - public void deleteContainer(long containerID) throws IOException { - lock.lock(); - try { - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to delete container " + containerID + ", reason : " + - "container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - containerStore.delete(dbKey); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} Used by client to update container state on SCM. - */ - @Override - public HddsProtos.LifeCycleState updateContainerState( - long containerID, HddsProtos.LifeCycleEvent event) throws - IOException { - ContainerInfo containerInfo; - lock.lock(); - try { - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to update container state" - + containerID - + ", reason : container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - containerInfo = - ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes)); - - Preconditions.checkNotNull(containerInfo); - switch (event) { - case CREATE: - // Acquire lease on container - Lease<ContainerInfo> containerLease = - containerLeaseManager.acquire(containerInfo); - // Register callback to be executed in case of timeout - containerLease.registerCallBack(() -> { - updateContainerState(containerID, - HddsProtos.LifeCycleEvent.TIMEOUT); - return null; - }); - break; - case CREATED: - // Release the lease on container - containerLeaseManager.release(containerInfo); - break; - case FINALIZE: - // TODO: we don't need a lease manager here for closing as the - // container report will include the container state after HDFS-13008 - // If a client failed to update the container close state, DN container - // report from 3 DNs will be used to close the container eventually. - break; - case CLOSE: - break; - case UPDATE: - break; - case DELETE: - break; - case TIMEOUT: - break; - case CLEANUP: - break; - default: - throw new SCMException("Unsupported container LifeCycleEvent.", - FAILED_TO_CHANGE_CONTAINER_STATE); - } - // If the below updateContainerState call fails, we should revert the - // changes made in switch case. - // Like releasing the lease in case of BEGIN_CREATE. - ContainerInfo updatedContainer = containerStateManager - .updateContainerState(containerInfo, event); - if (!updatedContainer.isContainerOpen()) { - pipelineSelector.removeContainerFromPipeline( - containerInfo.getPipelineID(), containerID); - } - containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); - return updatedContainer.getState(); - } catch (LeaseException e) { - throw new IOException("Lease Exception.", e); - } finally { - lock.unlock(); - } - } - - /** - * Update deleteTransactionId according to deleteTransactionMap. - * - * @param deleteTransactionMap Maps the containerId to latest delete - * transaction id for the container. - * @throws IOException - */ - public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) - throws IOException { - if (deleteTransactionMap == null) { - return; - } - - lock.lock(); - try { - BatchOperation batch = new BatchOperation(); - for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) { - long containerID = entry.getKey(); - byte[] dbKey = Longs.toByteArray(containerID); - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes == null) { - throw new SCMException( - "Failed to increment number of deleted blocks for container " - + containerID + ", reason : " + "container doesn't exist.", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - ContainerInfo containerInfo = ContainerInfo.fromProtobuf( - HddsProtos.SCMContainerInfo.parseFrom(containerBytes)); - containerInfo.updateDeleteTransactionId(entry.getValue()); - batch.put(dbKey, containerInfo.getProtobuf().toByteArray()); - } - containerStore.writeBatch(batch); - containerStateManager - .updateDeleteTransactionId(deleteTransactionMap); - } finally { - lock.unlock(); - } - } - - /** - * Returns the container State Manager. - * - * @return ContainerStateManager - */ - @Override - public ContainerStateManager getStateManager() { - return containerStateManager; - } - - /** - * Return a container matching the attributes specified. - * - * @param sizeRequired - Space needed in the Container. - * @param owner - Owner of the container - A specific nameservice. - * @param type - Replication Type {StandAlone, Ratis} - * @param factor - Replication Factor {ONE, THREE} - * @param state - State of the Container-- {Open, Allocated etc.} - * @return ContainerInfo, null if there is no match found. - */ - public ContainerWithPipeline getMatchingContainerWithPipeline( - final long sizeRequired, String owner, ReplicationType type, - ReplicationFactor factor, LifeCycleState state) throws IOException { - ContainerInfo containerInfo = getStateManager() - .getMatchingContainer(sizeRequired, owner, type, factor, state); - if (containerInfo == null) { - return null; - } - Pipeline pipeline = pipelineSelector - .getPipeline(containerInfo.getPipelineID()); - return new ContainerWithPipeline(containerInfo, pipeline); - } - - /** - * Process container report from Datanode. - * <p> - * Processing follows a very simple logic for time being. - * <p> - * 1. Datanodes report the current State -- denoted by the datanodeState - * <p> - * 2. We are the older SCM state from the Database -- denoted by - * the knownState. - * <p> - * 3. We copy the usage etc. from currentState to newState and log that - * newState to the DB. This allows us SCM to bootup again and read the - * state of the world from the DB, and then reconcile the state from - * container reports, when they arrive. - * - * @param reports Container report - */ - @Override - public void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports, boolean isRegisterCall) - throws IOException { - List<StorageContainerDatanodeProtocolProtos.ContainerInfo> - containerInfos = reports.getReportsList(); - PendingDeleteStatusList pendingDeleteStatusList = - new PendingDeleteStatusList(datanodeDetails); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo : - containerInfos) { - // Update replica info during registration process. - if (isRegisterCall) { - try { - getStateManager().addContainerReplica(ContainerID. - valueof(contInfo.getContainerID()), datanodeDetails); - } catch (Exception ex) { - // Continue to next one after logging the error. - LOG.error("Error while adding replica for containerId {}.", - contInfo.getContainerID(), ex); - } - } - byte[] dbKey = Longs.toByteArray(contInfo.getContainerID()); - lock.lock(); - try { - byte[] containerBytes = containerStore.get(dbKey); - if (containerBytes != null) { - HddsProtos.SCMContainerInfo knownState = - HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); - - if (knownState.getState() == LifeCycleState.CLOSING - && contInfo.getState() == LifeCycleState.CLOSED) { - - updateContainerState(contInfo.getContainerID(), - LifeCycleEvent.CLOSE); - - //reread the container - knownState = - HddsProtos.SCMContainerInfo.PARSER - .parseFrom(containerStore.get(dbKey)); - } - - HddsProtos.SCMContainerInfo newState = - reconcileState(contInfo, knownState, datanodeDetails); - - if (knownState.getDeleteTransactionId() > contInfo - .getDeleteTransactionId()) { - pendingDeleteStatusList - .addPendingDeleteStatus(contInfo.getDeleteTransactionId(), - knownState.getDeleteTransactionId(), - knownState.getContainerID()); - } - - // FIX ME: This can be optimized, we write twice to memory, where a - // single write would work well. - // - // We need to write this to DB again since the closed only write - // the updated State. - containerStore.put(dbKey, newState.toByteArray()); - - } else { - // Container not found in our container db. - LOG.error("Error while processing container report from datanode :" + - " {}, for container: {}, reason: container doesn't exist in" + - "container database.", datanodeDetails, - contInfo.getContainerID()); - } - } finally { - lock.unlock(); - } - } - if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { - eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS, - pendingDeleteStatusList); - } - - } - - /** - * Reconciles the state from Datanode with the state in SCM. - * - * @param datanodeState - State from the Datanode. - * @param knownState - State inside SCM. - * @param dnDetails - * @return new SCM State for this container. - */ - private HddsProtos.SCMContainerInfo reconcileState( - StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, - SCMContainerInfo knownState, DatanodeDetails dnDetails) { - HddsProtos.SCMContainerInfo.Builder builder = - HddsProtos.SCMContainerInfo.newBuilder(); - builder.setContainerID(knownState.getContainerID()) - .setPipelineID(knownState.getPipelineID()) - .setReplicationType(knownState.getReplicationType()) - .setReplicationFactor(knownState.getReplicationFactor()); - - // TODO: If current state doesn't have this DN in list of DataNodes with - // replica then add it in list of replicas. - - // If used size is greater than allocated size, we will be updating - // allocated size with used size. This update is done as a fallback - // mechanism in case SCM crashes without properly updating allocated - // size. Correct allocated value will be updated by - // ContainerStateManager during SCM shutdown. - long usedSize = datanodeState.getUsed(); - long allocated = knownState.getAllocatedBytes() > usedSize ? - knownState.getAllocatedBytes() : usedSize; - builder.setAllocatedBytes(allocated) - .setUsedBytes(usedSize) - .setNumberOfKeys(datanodeState.getKeyCount()) - .setState(knownState.getState()) - .setStateEnterTime(knownState.getStateEnterTime()) - .setContainerID(knownState.getContainerID()) - .setDeleteTransactionId(knownState.getDeleteTransactionId()); - if (knownState.getOwner() != null) { - builder.setOwner(knownState.getOwner()); - } - return builder.build(); - } - - - /** - * In Container is in closed state, if it is in closed, Deleting or Deleted - * State. - * - * @param info - ContainerInfo. - * @return true if is in open state, false otherwise - */ - private boolean shouldClose(ContainerInfo info) { - return info.getState() == HddsProtos.LifeCycleState.OPEN; - } - - private boolean isClosed(ContainerInfo info) { - return info.getState() == HddsProtos.LifeCycleState.CLOSED; - } - - /** - * Closes this stream and releases any system resources associated with it. - * If the stream is - * already closed then invoking this method has no effect. - * <p> - * <p>As noted in {@link AutoCloseable#close()}, cases where the close may - * fail require careful - * attention. It is strongly advised to relinquish the underlying resources - * and to internally - * <em>mark</em> the {@code Closeable} as closed, prior to throwing the - * {@code IOException}. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - if (containerLeaseManager != null) { - containerLeaseManager.shutdown(); - } - if (containerStateManager != null) { - flushContainerInfo(); - containerStateManager.close(); - } - if (containerStore != null) { - containerStore.close(); - } - - if (pipelineSelector != null) { - pipelineSelector.shutdown(); - } - } - - /** - * Since allocatedBytes of a container is only in memory, stored in - * containerStateManager, when closing ContainerMapping, we need to update - * this in the container store. - * - * @throws IOException on failure. - */ - @VisibleForTesting - public void flushContainerInfo() throws IOException { - List<ContainerInfo> containers = containerStateManager.getAllContainers(); - List<Long> failedContainers = new ArrayList<>(); - for (ContainerInfo info : containers) { - // even if some container updated failed, others can still proceed - try { - byte[] dbKey = Longs.toByteArray(info.getContainerID()); - byte[] containerBytes = containerStore.get(dbKey); - // TODO : looks like when a container is deleted, the container is - // removed from containerStore but not containerStateManager, so it can - // return info of a deleted container. may revisit this in the future, - // for now, just skip a not-found container - if (containerBytes != null) { - containerStore.put(dbKey, info.getProtobuf().toByteArray()); - } else { - LOG.debug("Container state manager has container {} but not found " + - "in container store, a deleted container?", - info.getContainerID()); - } - } catch (IOException ioe) { - failedContainers.add(info.getContainerID()); - } - } - if (!failedContainers.isEmpty()) { - throw new IOException("Error in flushing container info from container " + - "state manager: " + failedContainers); - } - } - - @VisibleForTesting - public MetadataStore getContainerStore() { - return containerStore; - } - - public PipelineSelector getPipelineSelector() { - return pipelineSelector; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java deleted file mode 100644 index 71935f0..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import java.io.IOException; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.states.ReportResult; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles container reports from datanode. - */ -public class ContainerReportHandler implements - EventHandler<ContainerReportFromDatanode> { - - private static final Logger LOG = - LoggerFactory.getLogger(ContainerReportHandler.class); - - private final NodeManager nodeManager; - - private final Mapping containerMapping; - - private ContainerStateManager containerStateManager; - - private ReplicationActivityStatus replicationStatus; - - public ContainerReportHandler(Mapping containerMapping, - NodeManager nodeManager, - ReplicationActivityStatus replicationActivityStatus) { - Preconditions.checkNotNull(containerMapping); - Preconditions.checkNotNull(nodeManager); - Preconditions.checkNotNull(replicationActivityStatus); - this.containerStateManager = containerMapping.getStateManager(); - this.nodeManager = nodeManager; - this.containerMapping = containerMapping; - this.replicationStatus = replicationActivityStatus; - } - - @Override - public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, - EventPublisher publisher) { - - DatanodeDetails datanodeOrigin = - containerReportFromDatanode.getDatanodeDetails(); - - ContainerReportsProto containerReport = - containerReportFromDatanode.getReport(); - try { - - //update state in container db and trigger close container events - containerMapping - .processContainerReports(datanodeOrigin, containerReport, false); - - Set<ContainerID> containerIds = containerReport.getReportsList().stream() - .map(StorageContainerDatanodeProtocolProtos - .ContainerInfo::getContainerID) - .map(ContainerID::new) - .collect(Collectors.toSet()); - - ReportResult<ContainerID> reportResult = nodeManager - .processContainerReport(datanodeOrigin.getUuid(), containerIds); - - //we have the report, so we can update the states for the next iteration. - nodeManager - .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); - - for (ContainerID containerID : reportResult.getMissingEntries()) { - containerStateManager - .removeContainerReplica(containerID, datanodeOrigin); - checkReplicationState(containerID, publisher); - } - - for (ContainerID containerID : reportResult.getNewEntries()) { - containerStateManager.addContainerReplica(containerID, datanodeOrigin); - checkReplicationState(containerID, publisher); - } - - } catch (IOException e) { - //TODO: stop all the replication? - LOG.error("Error on processing container report from datanode {}", - datanodeOrigin, e); - } - - } - - private void checkReplicationState(ContainerID containerID, - EventPublisher publisher) - throws SCMException { - ContainerInfo container = containerStateManager.getContainer(containerID); - - if (container == null) { - //warning unknown container - LOG.warn( - "Container is missing from containerStateManager. Can't request " - + "replication. {}", - containerID); - return; - } - if (container.isContainerOpen()) { - return; - } - - ReplicationRequest replicationState = - containerStateManager.checkReplicationState(containerID); - if (replicationState != null) { - if (replicationStatus.isReplicationEnabled()) { - publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - replicationState); - } else { - LOG.warn( - "Over/under replicated container but the replication is not " - + "(yet) enabled: " - + replicationState.toString()); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java deleted file mode 100644 index 930c098..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ /dev/null @@ -1,570 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.container; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; -import org.apache.hadoop.hdds.scm.container.states.ContainerState; -import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.ozone.common.statemachine - .InvalidStateTransitionException; -import org.apache.hadoop.ozone.common.statemachine.StateMachine; -import org.apache.hadoop.util.Time; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_CONTAINER_STATE; - -/** - * A container state manager keeps track of container states and returns - * containers that match various queries. - * <p> - * This state machine is driven by a combination of server and client actions. - * <p> - * This is how a create container happens: 1. When a container is created, the - * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM - * has chosen a pipeline for container to live on. However, the container is not - * created yet. This container along with the pipeline is returned to the - * client. - * <p> - * 2. The client when it sees the Container state as ALLOCATED understands that - * container needs to be created on the specified pipeline. The client lets the - * SCM know that saw this flag and is initiating the on the data nodes. - * <p> - * This is done by calling into notifyObjectCreation(ContainerName, - * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state - * into CREATING. All this state means is that SCM told Client to create a - * container and client saw that request. - * <p> - * 3. Then client makes calls to datanodes directly, asking the datanodes to - * create the container. This is done with the help of pipeline that supports - * this container. - * <p> - * 4. Once the creation of the container is complete, the client will make - * another call to the SCM, this time specifying the containerName and the - * COMPLETE_CREATE as the Event. - * <p> - * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is - * the state when clients can write to a container. - * <p> - * 6. If the client does not respond with the COMPLETE_CREATE event with a - * certain time, the state machine times out and triggers a delete operation of - * the container. - * <p> - * Please see the function initializeStateMachine below to see how this looks in - * code. - * <p> - * Reusing existing container : - * <p> - * The create container call is not made all the time, the system tries to use - * open containers as much as possible. So in those cases, it looks thru the - * list of open containers and will return containers that match the specific - * signature. - * <p> - * Please note : Logically there are 3 separate state machines in the case of - * containers. - * <p> - * The Create State Machine -- Commented extensively above. - * <p> - * Open/Close State Machine - Once the container is in the Open State, - * eventually it will be closed, once sufficient data has been written to it. - * <p> - * TimeOut Delete Container State Machine - if the container creating times out, - * then Container State manager decides to delete the container. - */ -public class ContainerStateManager implements Closeable { - private static final Logger LOG = - LoggerFactory.getLogger(ContainerStateManager.class); - - private final StateMachine<HddsProtos.LifeCycleState, - HddsProtos.LifeCycleEvent> stateMachine; - - private final long containerSize; - private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap; - private final ContainerStateMap containers; - private final AtomicLong containerCount; - - /** - * Constructs a Container State Manager that tracks all containers owned by - * SCM for the purpose of allocation of blocks. - * <p> - * TODO : Add Container Tags so we know which containers are owned by SCM. - */ - @SuppressWarnings("unchecked") - public ContainerStateManager(Configuration configuration, - Mapping containerMapping, PipelineSelector pipelineSelector) { - - // Initialize the container state machine. - Set<HddsProtos.LifeCycleState> finalStates = new HashSet(); - - // These are the steady states of a container. - finalStates.add(LifeCycleState.OPEN); - finalStates.add(LifeCycleState.CLOSED); - finalStates.add(LifeCycleState.DELETED); - - this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED, - finalStates); - initializeStateMachine(); - - this.containerSize = (long) configuration.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, - StorageUnit.BYTES); - - lastUsedMap = new ConcurrentHashMap<>(); - containerCount = new AtomicLong(0); - containers = new ContainerStateMap(); - loadExistingContainers(containerMapping, pipelineSelector); - } - - private void loadExistingContainers(Mapping containerMapping, - PipelineSelector pipelineSelector) { - - List<ContainerInfo> containerList; - try { - containerList = containerMapping.listContainer(0, Integer.MAX_VALUE); - - // if there are no container to load, let us return. - if (containerList == null || containerList.size() == 0) { - LOG.info("No containers to load for this cluster."); - return; - } - } catch (IOException e) { - if (!e.getMessage().equals("No container exists in current db")) { - LOG.error("Could not list the containers", e); - } - return; - } - - try { - long maxID = 0; - for (ContainerInfo container : containerList) { - containers.addContainer(container); - pipelineSelector.addContainerToPipeline( - container.getPipelineID(), container.getContainerID()); - - if (maxID < container.getContainerID()) { - maxID = container.getContainerID(); - } - - containerCount.set(maxID); - } - } catch (SCMException ex) { - LOG.error("Unable to create a container information. ", ex); - // Fix me, what is the proper shutdown procedure for SCM ?? - // System.exit(1) // Should we exit here? - } - } - - /** - * Return the info of all the containers kept by the in-memory mapping. - * - * @return the list of all container info. - */ - public List<ContainerInfo> getAllContainers() { - List<ContainerInfo> list = new ArrayList<>(); - - //No Locking needed since the return value is an immutable map. - containers.getContainerMap().forEach((key, value) -> list.add(value)); - return list; - } - - /* - * - * Event and State Transition Mapping: - * - * State: ALLOCATED ---------------> CREATING - * Event: CREATE - * - * State: CREATING ---------------> OPEN - * Event: CREATED - * - * State: OPEN ---------------> CLOSING - * Event: FINALIZE - * - * State: CLOSING ---------------> CLOSED - * Event: CLOSE - * - * State: CLOSED ----------------> DELETING - * Event: DELETE - * - * State: DELETING ----------------> DELETED - * Event: CLEANUP - * - * State: CREATING ---------------> DELETING - * Event: TIMEOUT - * - * - * Container State Flow: - * - * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]------->[CLOSED] - * (CREATE) | (CREATED) (FINALIZE) (CLOSE) | - * | | - * | | - * |(TIMEOUT) (DELETE)| - * | | - * +-------------> [DELETING] <-------------------+ - * | - * | - * (CLEANUP)| - * | - * [DELETED] - */ - private void initializeStateMachine() { - stateMachine.addTransition(LifeCycleState.ALLOCATED, - LifeCycleState.CREATING, - LifeCycleEvent.CREATE); - - stateMachine.addTransition(LifeCycleState.CREATING, - LifeCycleState.OPEN, - LifeCycleEvent.CREATED); - - stateMachine.addTransition(LifeCycleState.OPEN, - LifeCycleState.CLOSING, - LifeCycleEvent.FINALIZE); - - stateMachine.addTransition(LifeCycleState.CLOSING, - LifeCycleState.CLOSED, - LifeCycleEvent.CLOSE); - - stateMachine.addTransition(LifeCycleState.CLOSED, - LifeCycleState.DELETING, - LifeCycleEvent.DELETE); - - stateMachine.addTransition(LifeCycleState.CREATING, - LifeCycleState.DELETING, - LifeCycleEvent.TIMEOUT); - - stateMachine.addTransition(LifeCycleState.DELETING, - LifeCycleState.DELETED, - LifeCycleEvent.CLEANUP); - } - - /** - * allocates a new container based on the type, replication etc. - * - * @param selector -- Pipeline selector class. - * @param type -- Replication type. - * @param replicationFactor - Replication replicationFactor. - * @return ContainerWithPipeline - * @throws IOException on Failure. - */ - public ContainerWithPipeline allocateContainer(PipelineSelector selector, - HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor replicationFactor, String owner) - throws IOException { - - Pipeline pipeline = selector.getReplicationPipeline(type, - replicationFactor); - - Preconditions.checkNotNull(pipeline, "Pipeline type=%s/" - + "replication=%s couldn't be found for the new container. " - + "Do you have enough nodes?", type, replicationFactor); - - long containerID = containerCount.incrementAndGet(); - ContainerInfo containerInfo = new ContainerInfo.Builder() - .setState(HddsProtos.LifeCycleState.ALLOCATED) - .setPipelineID(pipeline.getId()) - // This is bytes allocated for blocks inside container, not the - // container size - .setAllocatedBytes(0) - .setUsedBytes(0) - .setNumberOfKeys(0) - .setStateEnterTime(Time.monotonicNow()) - .setOwner(owner) - .setContainerID(containerID) - .setDeleteTransactionId(0) - .setReplicationFactor(replicationFactor) - .setReplicationType(pipeline.getType()) - .build(); - selector.addContainerToPipeline(pipeline.getId(), containerID); - Preconditions.checkNotNull(containerInfo); - containers.addContainer(containerInfo); - LOG.trace("New container allocated: {}", containerInfo); - return new ContainerWithPipeline(containerInfo, pipeline); - } - - /** - * Update the Container State to the next state. - * - * @param info - ContainerInfo - * @param event - LifeCycle Event - * @return Updated ContainerInfo. - * @throws SCMException on Failure. - */ - public ContainerInfo updateContainerState(ContainerInfo - info, HddsProtos.LifeCycleEvent event) throws SCMException { - LifeCycleState newState; - try { - newState = this.stateMachine.getNextState(info.getState(), event); - } catch (InvalidStateTransitionException ex) { - String error = String.format("Failed to update container state %s, " + - "reason: invalid state transition from state: %s upon " + - "event: %s.", - info.getContainerID(), info.getState(), event); - LOG.error(error); - throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE); - } - - // This is a post condition after executing getNextState. - Preconditions.checkNotNull(newState); - containers.updateState(info, info.getState(), newState); - return containers.getContainerInfo(info); - } - - /** - * Update the container State. - * @param info - Container Info - * @return ContainerInfo - * @throws SCMException - on Error. - */ - public ContainerInfo updateContainerInfo(ContainerInfo info) - throws SCMException { - containers.updateContainerInfo(info); - return containers.getContainerInfo(info); - } - - /** - * Update deleteTransactionId for a container. - * - * @param deleteTransactionMap maps containerId to its new - * deleteTransactionID - */ - public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) { - for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) { - containers.getContainerMap().get(ContainerID.valueof(entry.getKey())) - .updateDeleteTransactionId(entry.getValue()); - } - } - - /** - * Return a container matching the attributes specified. - * - * @param size - Space needed in the Container. - * @param owner - Owner of the container - A specific nameservice. - * @param type - Replication Type {StandAlone, Ratis} - * @param factor - Replication Factor {ONE, THREE} - * @param state - State of the Container-- {Open, Allocated etc.} - * @return ContainerInfo, null if there is no match found. - */ - public ContainerInfo getMatchingContainer(final long size, - String owner, ReplicationType type, ReplicationFactor factor, - LifeCycleState state) { - - // Find containers that match the query spec, if no match return null. - NavigableSet<ContainerID> matchingSet = - containers.getMatchingContainerIDs(state, owner, factor, type); - if (matchingSet == null || matchingSet.size() == 0) { - return null; - } - - // Get the last used container and find container above the last used - // container ID. - ContainerState key = new ContainerState(owner, type, factor); - ContainerID lastID = lastUsedMap.get(key); - if (lastID == null) { - lastID = matchingSet.first(); - } - - // There is a small issue here. The first time, we will skip the first - // container. But in most cases it will not matter. - NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false); - if (resultSet.size() == 0) { - resultSet = matchingSet; - } - - ContainerInfo selectedContainer = - findContainerWithSpace(size, resultSet, owner); - if (selectedContainer == null) { - - // If we did not find any space in the tailSet, we need to look for - // space in the headset, we need to pass true to deal with the - // situation that we have a lone container that has space. That is we - // ignored the last used container under the assumption we can find - // other containers with space, but if have a single container that is - // not true. Hence we need to include the last used container as the - // last element in the sorted set. - - resultSet = matchingSet.headSet(lastID, true); - selectedContainer = findContainerWithSpace(size, resultSet, owner); - } - // Update the allocated Bytes on this container. - if (selectedContainer != null) { - selectedContainer.updateAllocatedBytes(size); - } - return selectedContainer; - - } - - private ContainerInfo findContainerWithSpace(long size, - NavigableSet<ContainerID> searchSet, String owner) { - // Get the container with space to meet our request. - for (ContainerID id : searchSet) { - ContainerInfo containerInfo = containers.getContainerInfo(id); - if (containerInfo.getAllocatedBytes() + size <= this.containerSize) { - containerInfo.updateLastUsedTime(); - - ContainerState key = new ContainerState(owner, - containerInfo.getReplicationType(), - containerInfo.getReplicationFactor()); - lastUsedMap.put(key, containerInfo.containerID()); - return containerInfo; - } - } - return null; - } - - /** - * Returns a set of ContainerIDs that match the Container. - * - * @param owner Owner of the Containers. - * @param type - Replication Type of the containers - * @param factor - Replication factor of the containers. - * @param state - Current State, like Open, Close etc. - * @return Set of containers that match the specific query parameters. - */ - public NavigableSet<ContainerID> getMatchingContainerIDs( - String owner, ReplicationType type, ReplicationFactor factor, - LifeCycleState state) { - return containers.getMatchingContainerIDs(state, owner, - factor, type); - } - - /** - * Returns the containerInfo with pipeline for the given container id. - * @param selector -- Pipeline selector class. - * @param containerID id of the container - * @return ContainerInfo containerInfo - * @throws IOException - */ - public ContainerWithPipeline getContainer(PipelineSelector selector, - ContainerID containerID) { - ContainerInfo info = containers.getContainerInfo(containerID.getId()); - Pipeline pipeline = selector.getPipeline(info.getPipelineID()); - return new ContainerWithPipeline(info, pipeline); - } - - /** - * Returns the containerInfo for the given container id. - * @param containerID id of the container - * @return ContainerInfo containerInfo - * @throws IOException - */ - public ContainerInfo getContainer(ContainerID containerID) { - return containers.getContainerInfo(containerID); - } - - @Override - public void close() throws IOException { - } - - /** - * Returns the latest list of DataNodes where replica for given containerId - * exist. Throws an SCMException if no entry is found for given containerId. - * - * @param containerID - * @return Set<DatanodeDetails> - */ - public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID) - throws SCMException { - return containers.getContainerReplicas(containerID); - } - - /** - * Add a container Replica for given DataNode. - * - * @param containerID - * @param dn - */ - public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) { - containers.addContainerReplica(containerID, dn); - } - - /** - * Remove a container Replica for given DataNode. - * - * @param containerID - * @param dn - * @return True of dataNode is removed successfully else false. - */ - public boolean removeContainerReplica(ContainerID containerID, - DatanodeDetails dn) throws SCMException { - return containers.removeContainerReplica(containerID, dn); - } - - /** - * Compare the existing replication number with the expected one. - */ - public ReplicationRequest checkReplicationState(ContainerID containerID) - throws SCMException { - int existingReplicas = getContainerReplicas(containerID).size(); - int expectedReplicas = getContainer(containerID) - .getReplicationFactor().getNumber(); - if (existingReplicas != expectedReplicas) { - return new ReplicationRequest(containerID.getId(), existingReplicas, - expectedReplicas); - } - return null; - } - - /** - * Checks if the container is open. - */ - public boolean isOpen(ContainerID containerID) { - Preconditions.checkNotNull(containerID); - ContainerInfo container = Preconditions - .checkNotNull(getContainer(containerID), - "Container can't be found " + containerID); - return container.isContainerOpen(); - } - - @VisibleForTesting - public ContainerStateMap getContainerStateMap() { - return containers; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java deleted file mode 100644 index 5ed80cb..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Mapping class contains the mapping from a name to a pipeline mapping. This is - * used by SCM when allocating new locations and when looking up a key. - */ -public interface Mapping extends Closeable { - /** - * Returns the ContainerInfo from the container ID. - * - * @param containerID - ID of container. - * @return - ContainerInfo such as creation state and the pipeline. - * @throws IOException - */ - ContainerInfo getContainer(long containerID) throws IOException; - - /** - * Returns the ContainerInfo from the container ID. - * - * @param containerID - ID of container. - * @return - ContainerWithPipeline such as creation state and the pipeline. - * @throws IOException - */ - ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException; - - /** - * Returns containers under certain conditions. - * Search container IDs from start ID(exclusive), - * The max size of the searching range cannot exceed the - * value of count. - * - * @param startContainerID start containerID, >=0, - * start searching at the head if 0. - * @param count count must be >= 0 - * Usually the count will be replace with a very big - * value instead of being unlimited in case the db is very big. - * - * @return a list of container. - * @throws IOException - */ - List<ContainerInfo> listContainer(long startContainerID, int count) - throws IOException; - - /** - * Allocates a new container for a given keyName and replication factor. - * - * @param replicationFactor - replication factor of the container. - * @param owner - * @return - ContainerWithPipeline. - * @throws IOException - */ - ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor replicationFactor, String owner) - throws IOException; - - /** - * Deletes a container from SCM. - * - * @param containerID - Container ID - * @throws IOException - */ - void deleteContainer(long containerID) throws IOException; - - /** - * Update container state. - * @param containerID - Container ID - * @param event - container life cycle event - * @return - new container state - * @throws IOException - */ - HddsProtos.LifeCycleState updateContainerState(long containerID, - HddsProtos.LifeCycleEvent event) throws IOException; - - /** - * Returns the container State Manager. - * @return ContainerStateManager - */ - ContainerStateManager getStateManager(); - - /** - * Process container report from Datanode. - * - * @param reports Container report - */ - void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports, boolean isRegisterCall) - throws IOException; - - /** - * Update deleteTransactionId according to deleteTransactionMap. - * - * @param deleteTransactionMap Maps the containerId to latest delete - * transaction id for the container. - * @throws IOException - */ - void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) - throws IOException; - - /** - * Returns the ContainerWithPipeline. - * @return NodeManager - */ - ContainerWithPipeline getMatchingContainerWithPipeline(long size, - String owner, ReplicationType type, ReplicationFactor factor, - LifeCycleState state) throws IOException; - - PipelineSelector getPipelineSelector(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java deleted file mode 100644 index ee02bbd..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -/** - * This package has class that close a container. That is move a container from - * open state to close state. - */ -package org.apache.hadoop.hdds.scm.container.closer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java deleted file mode 100644 index 3f8d056..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.container; -/** - * This package contains routines to manage the container location and - * mapping inside SCM - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java deleted file mode 100644 index 3336c8e..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.container.placement.algorithms; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -import java.io.IOException; -import java.util.List; - -/** - * A ContainerPlacementPolicy support choosing datanodes to build replication - * pipeline with specified constraints. - */ -public interface ContainerPlacementPolicy { - - /** - * Given the replication factor and size required, return set of datanodes - * that satisfy the nodes and size requirement. - * - * @param excludedNodes - list of nodes to be excluded. - * @param nodesRequired - number of datanodes required. - * @param sizeRequired - size required for the container or block. - * @return list of datanodes chosen. - * @throws IOException - */ - List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes, - int nodesRequired, long sizeRequired) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java deleted file mode 100644 index 60861b7..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.container.placement.algorithms; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.stream.Collectors; - -/** - * SCM CommonPolicy implements a set of invariants which are common - * for all container placement policies, acts as the repository of helper - * functions which are common to placement policies. - */ -public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(SCMCommonPolicy.class); - private final NodeManager nodeManager; - private final Random rand; - private final Configuration conf; - - /** - * Constructs SCM Common Policy Class. - * - * @param nodeManager NodeManager - * @param conf Configuration class. - */ - public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) { - this.nodeManager = nodeManager; - this.rand = new Random(); - this.conf = conf; - } - - /** - * Return node manager. - * - * @return node manager - */ - public NodeManager getNodeManager() { - return nodeManager; - } - - /** - * Returns the Random Object. - * - * @return rand - */ - public Random getRand() { - return rand; - } - - /** - * Get Config. - * - * @return Configuration - */ - public Configuration getConf() { - return conf; - } - - /** - * Given the replication factor and size required, return set of datanodes - * that satisfy the nodes and size requirement. - * <p> - * Here are some invariants of container placement. - * <p> - * 1. We place containers only on healthy nodes. - * 2. We place containers on nodes with enough space for that container. - * 3. if a set of containers are requested, we either meet the required - * number of nodes or we fail that request. - * - * - * @param excludedNodes - datanodes with existing replicas - * @param nodesRequired - number of datanodes required. - * @param sizeRequired - size required for the container or block. - * @return list of datanodes chosen. - * @throws SCMException SCM exception. - */ - - public List<DatanodeDetails> chooseDatanodes( - List<DatanodeDetails> excludedNodes, - int nodesRequired, final long sizeRequired) throws SCMException { - List<DatanodeDetails> healthyNodes = - nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); - healthyNodes.removeAll(excludedNodes); - String msg; - if (healthyNodes.size() == 0) { - msg = "No healthy node found to allocate container."; - LOG.error(msg); - throw new SCMException(msg, SCMException.ResultCodes - .FAILED_TO_FIND_HEALTHY_NODES); - } - - if (healthyNodes.size() < nodesRequired) { - msg = String.format("Not enough healthy nodes to allocate container. %d " - + " datanodes required. Found %d", - nodesRequired, healthyNodes.size()); - LOG.error(msg); - throw new SCMException(msg, - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); - } - List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> - hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList()); - - if (healthyList.size() < nodesRequired) { - msg = String.format("Unable to find enough nodes that meet the space " + - "requirement of %d bytes in healthy node set." + - " Nodes required: %d Found: %d", - sizeRequired, nodesRequired, healthyList.size()); - LOG.error(msg); - throw new SCMException(msg, - SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE); - } - - return healthyList; - } - - /** - * Returns true if this node has enough space to meet our requirement. - * - * @param datanodeDetails DatanodeDetails - * @return true if we have enough space. - */ - private boolean hasEnoughSpace(DatanodeDetails datanodeDetails, - long sizeRequired) { - SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails); - return (nodeMetric != null) && (nodeMetric.get() != null) - && nodeMetric.get().getRemaining().hasResources(sizeRequired); - } - - /** - * This function invokes the derived classes chooseNode Function to build a - * list of nodes. Then it verifies that invoked policy was able to return - * expected number of nodes. - * - * @param nodesRequired - Nodes Required - * @param healthyNodes - List of Nodes in the result set. - * @return List of Datanodes that can be used for placement. - * @throws SCMException - */ - public List<DatanodeDetails> getResultSet( - int nodesRequired, List<DatanodeDetails> healthyNodes) - throws SCMException { - List<DatanodeDetails> results = new LinkedList<>(); - for (int x = 0; x < nodesRequired; x++) { - // invoke the choose function defined in the derived classes. - DatanodeDetails nodeId = chooseNode(healthyNodes); - if (nodeId != null) { - results.add(nodeId); - } - } - - if (results.size() < nodesRequired) { - LOG.error("Unable to find the required number of healthy nodes that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", - nodesRequired, results.size()); - throw new SCMException("Unable to find required number of nodes.", - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); - } - return results; - } - - /** - * Choose a datanode according to the policy, this function is implemented - * by the actual policy class. For example, PlacementCapacity or - * PlacementRandom. - * - * @param healthyNodes - Set of healthy nodes we can choose from. - * @return DatanodeDetails - */ - public abstract DatanodeDetails chooseNode( - List<DatanodeDetails> healthyNodes); - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java deleted file mode 100644 index 8df8f6e..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.container.placement.algorithms; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Container placement policy that randomly choose datanodes with remaining - * space to satisfy the size constraints. - * <p> - * The Algorithm is as follows, Pick 2 random nodes from a given pool of nodes - * and then pick the node which lower utilization. This leads to a higher - * probability of nodes with lower utilization to be picked. - * <p> - * For those wondering why we choose two nodes randomly and choose the node - * with lower utilization. There are links to this original papers in - * HDFS-11564. - * <p> - * A brief summary -- We treat the nodes from a scale of lowest utilized to - * highest utilized, there are (s * ( s + 1)) / 2 possibilities to build - * distinct pairs of nodes. There are s - k pairs of nodes in which the rank - * k node is less than the couple. So probability of a picking a node is - * (2 * (s -k)) / (s * (s - 1)). - * <p> - * In English, There is a much higher probability of picking less utilized nodes - * as compared to nodes with higher utilization since we pick 2 nodes and - * then pick the node with lower utilization. - * <p> - * This avoids the issue of users adding new nodes into the cluster and HDFS - * sending all traffic to those nodes if we only use a capacity based - * allocation scheme. Unless those nodes are part of the set of the first 2 - * nodes then newer nodes will not be in the running to get the container. - * <p> - * This leads to an I/O pattern where the lower utilized nodes are favoured - * more than higher utilized nodes, but part of the I/O will still go to the - * older higher utilized nodes. - * <p> - * With this algorithm in place, our hope is that balancer tool needs to do - * little or no work and the cluster will achieve a balanced distribution - * over time. - */ -public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(SCMContainerPlacementCapacity.class); - - /** - * Constructs a Container Placement with considering only capacity. - * That is this policy tries to place containers based on node weight. - * - * @param nodeManager Node Manager - * @param conf Configuration - */ - public SCMContainerPlacementCapacity(final NodeManager nodeManager, - final Configuration conf) { - super(nodeManager, conf); - } - - /** - * Called by SCM to choose datanodes. - * - * - * @param excludedNodes - list of the datanodes to exclude. - * @param nodesRequired - number of datanodes required. - * @param sizeRequired - size required for the container or block. - * @return List of datanodes. - * @throws SCMException SCMException - */ - @Override - public List<DatanodeDetails> chooseDatanodes( - List<DatanodeDetails> excludedNodes, final int nodesRequired, - final long sizeRequired) throws SCMException { - List<DatanodeDetails> healthyNodes = - super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); - if (healthyNodes.size() == nodesRequired) { - return healthyNodes; - } - return getResultSet(nodesRequired, healthyNodes); - } - - /** - * Find a node from the healthy list and return it after removing it from the - * list that we are operating on. - * - * @param healthyNodes - List of healthy nodes that meet the size - * requirement. - * @return DatanodeDetails that is chosen. - */ - @Override - public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) { - int firstNodeNdx = getRand().nextInt(healthyNodes.size()); - int secondNodeNdx = getRand().nextInt(healthyNodes.size()); - - DatanodeDetails datanodeDetails; - // There is a possibility that both numbers will be same. - // if that is so, we just return the node. - if (firstNodeNdx == secondNodeNdx) { - datanodeDetails = healthyNodes.get(firstNodeNdx); - } else { - DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx); - DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx); - SCMNodeMetric firstNodeMetric = - getNodeManager().getNodeStat(firstNodeDetails); - SCMNodeMetric secondNodeMetric = - getNodeManager().getNodeStat(secondNodeDetails); - datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get()) - ? firstNodeDetails : secondNodeDetails; - } - healthyNodes.remove(datanodeDetails); - return datanodeDetails; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java deleted file mode 100644 index 76702d5..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.container.placement.algorithms; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Container placement policy that randomly chooses healthy datanodes. - * This is very similar to current HDFS placement. That is we - * just randomly place containers without any considerations of utilization. - * <p> - * That means we rely on balancer to achieve even distribution of data. - * Balancer will need to support containers as a feature before this class - * can be practically used. - */ -public final class SCMContainerPlacementRandom extends SCMCommonPolicy - implements ContainerPlacementPolicy { - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(SCMContainerPlacementRandom.class); - - /** - * Construct a random Block Placement policy. - * - * @param nodeManager nodeManager - * @param conf Config - */ - public SCMContainerPlacementRandom(final NodeManager nodeManager, - final Configuration conf) { - super(nodeManager, conf); - } - - /** - * Choose datanodes called by the SCM to choose the datanode. - * - * - * @param excludedNodes - list of the datanodes to exclude. - * @param nodesRequired - number of datanodes required. - * @param sizeRequired - size required for the container or block. - * @return List of Datanodes. - * @throws SCMException SCMException - */ - @Override - public List<DatanodeDetails> chooseDatanodes( - List<DatanodeDetails> excludedNodes, final int nodesRequired, - final long sizeRequired) throws SCMException { - List<DatanodeDetails> healthyNodes = - super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); - - if (healthyNodes.size() == nodesRequired) { - return healthyNodes; - } - return getResultSet(nodesRequired, healthyNodes); - } - - /** - * Just chose a node randomly and remove it from the set of nodes we can - * chose from. - * - * @param healthyNodes - all healthy datanodes. - * @return one randomly chosen datanode that from two randomly chosen datanode - */ - public DatanodeDetails chooseNode(final List<DatanodeDetails> healthyNodes) { - DatanodeDetails selectedNode = - healthyNodes.get(getRand().nextInt(healthyNodes.size())); - healthyNodes.remove(selectedNode); - return selectedNode; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java deleted file mode 100644 index 1cb810d..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container.placement.algorithms; -// Various placement algorithms. \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
