http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d37661/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java new file mode 100644 index 0000000..5635621 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -0,0 +1,572 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A Class to track the block collection IDs (Inode's ID) for which physical + * storage movement needed as per the Namespace and StorageReports from DN. + * It scan the pending directories for which storage movement is required and + * schedule the block collection IDs for movement. It track the info of + * scheduled items and remove the SPS xAttr from the file/Directory once + * movement is success. + */ +@InterfaceAudience.Private +public class BlockStorageMovementNeeded { + + public static final Logger LOG = + LoggerFactory.getLogger(BlockStorageMovementNeeded.class); + + private final Queue<ItemInfo> storageMovementNeeded = + new LinkedList<ItemInfo>(); + + /** + * Map of startId and number of child's. Number of child's indicate the + * number of files pending to satisfy the policy. + */ + private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory = + new HashMap<Long, DirPendingWorkInfo>(); + + private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus = + new ConcurrentHashMap<>(); + + private final Namesystem namesystem; + + // List of pending dir to satisfy the policy + private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>(); + + private final StoragePolicySatisfier sps; + + private Daemon inodeIdCollector; + + private final int maxQueuedItem; + + // Amount of time to cache the SUCCESS status of path before turning it to + // NOT_AVAILABLE. + private static long statusClearanceElapsedTimeMs = 300000; + + public BlockStorageMovementNeeded(Namesystem namesystem, + StoragePolicySatisfier sps, int queueLimit) { + this.namesystem = namesystem; + this.sps = sps; + this.maxQueuedItem = queueLimit; + } + + /** + * Add the candidate to tracking list for which storage movement + * expected if necessary. + * + * @param trackInfo + * - track info for satisfy the policy + */ + public synchronized void add(ItemInfo trackInfo) { + spsStatus.put(trackInfo.getStartId(), + new StoragePolicySatisfyPathStatusInfo( + StoragePolicySatisfyPathStatus.IN_PROGRESS)); + storageMovementNeeded.add(trackInfo); + } + + /** + * Add the itemInfo to tracking list for which storage movement + * expected if necessary. + * @param startId + * - start id + * @param itemInfoList + * - List of child in the directory + */ + @VisibleForTesting + public synchronized void addAll(long startId, + List<ItemInfo> itemInfoList, boolean scanCompleted) { + storageMovementNeeded.addAll(itemInfoList); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + if (pendingWork == null) { + pendingWork = new DirPendingWorkInfo(); + pendingWorkForDirectory.put(startId, pendingWork); + } + pendingWork.addPendingWorkCount(itemInfoList.size()); + if (scanCompleted) { + pendingWork.markScanCompleted(); + } + } + + /** + * Gets the block collection id for which storage movements check necessary + * and make the movement if required. + * + * @return block collection ID + */ + public synchronized ItemInfo get() { + return storageMovementNeeded.poll(); + } + + public synchronized void addToPendingDirQueue(long id) { + spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo( + StoragePolicySatisfyPathStatus.PENDING)); + spsDirsToBeTraveresed.add(id); + // Notify waiting FileInodeIdCollector thread about the newly + // added SPS path. + synchronized (spsDirsToBeTraveresed) { + spsDirsToBeTraveresed.notify(); + } + } + + /** + * Returns queue remaining capacity. + */ + public synchronized int remainingCapacity() { + int size = storageMovementNeeded.size(); + if (size >= maxQueuedItem) { + return 0; + } else { + return (maxQueuedItem - size); + } + } + + /** + * Returns queue size. + */ + public synchronized int size() { + return storageMovementNeeded.size(); + } + + public synchronized void clearAll() { + spsDirsToBeTraveresed.clear(); + storageMovementNeeded.clear(); + pendingWorkForDirectory.clear(); + } + + /** + * Decrease the pending child count for directory once one file blocks moved + * successfully. Remove the SPS xAttr if pending child count is zero. + */ + public synchronized void removeItemTrackInfo(ItemInfo trackInfo, + boolean isSuccess) throws IOException { + if (trackInfo.isDir()) { + // If track is part of some start inode then reduce the pending + // directory work count. + long startId = trackInfo.getStartId(); + INode inode = namesystem.getFSDirectory().getInode(startId); + if (inode == null) { + // directory deleted just remove it. + this.pendingWorkForDirectory.remove(startId); + updateStatus(startId, isSuccess); + } else { + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + if (pendingWork != null) { + pendingWork.decrementPendingWorkCount(); + if (pendingWork.isDirWorkDone()) { + namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(startId); + pendingWork.setFailure(!isSuccess); + updateStatus(startId, pendingWork.isPolicySatisfied()); + } + pendingWork.setFailure(isSuccess); + } + } + } else { + // Remove xAttr if trackID doesn't exist in + // storageMovementAttemptedItems or file policy satisfied. + namesystem.removeXattr(trackInfo.getTrackId(), + XATTR_SATISFY_STORAGE_POLICY); + updateStatus(trackInfo.getStartId(), isSuccess); + } + } + + public synchronized void clearQueue(long trackId) { + spsDirsToBeTraveresed.remove(trackId); + Iterator<ItemInfo> iterator = storageMovementNeeded.iterator(); + while (iterator.hasNext()) { + ItemInfo next = iterator.next(); + if (next.getStartId() == trackId) { + iterator.remove(); + } + } + pendingWorkForDirectory.remove(trackId); + } + + /** + * Mark inode status as SUCCESS in map. + */ + private void updateStatus(long startId, boolean isSuccess){ + StoragePolicySatisfyPathStatusInfo spsStatusInfo = + spsStatus.get(startId); + if (spsStatusInfo == null) { + spsStatusInfo = new StoragePolicySatisfyPathStatusInfo(); + spsStatus.put(startId, spsStatusInfo); + } + + if (isSuccess) { + spsStatusInfo.setSuccess(); + } else { + spsStatusInfo.setFailure(); + } + } + + /** + * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded + * and notify to clean up required resources. + * @throws IOException + */ + public synchronized void clearQueuesWithNotification() { + // Remove xAttr from directories + Long trackId; + while ((trackId = spsDirsToBeTraveresed.poll()) != null) { + try { + // Remove xAttr for file + namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY); + } catch (IOException ie) { + LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie); + } + } + + // File's directly added to storageMovementNeeded, So try to remove + // xAttr for file + ItemInfo itemInfo; + while ((itemInfo = storageMovementNeeded.poll()) != null) { + try { + // Remove xAttr for file + if (!itemInfo.isDir()) { + namesystem.removeXattr(itemInfo.getTrackId(), + XATTR_SATISFY_STORAGE_POLICY); + } + } catch (IOException ie) { + LOG.warn( + "Failed to remove SPS xattr for track id " + + itemInfo.getTrackId(), ie); + } + } + this.clearAll(); + } + + /** + * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child + * ID's to process for satisfy the policy. + */ + private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser + implements Runnable { + + private int remainingCapacity = 0; + + private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem); + + StorageMovementPendingInodeIdCollector(FSDirectory dir) { + super(dir); + } + + @Override + public void run() { + LOG.info("Starting FileInodeIdCollector!."); + long lastStatusCleanTime = 0; + while (namesystem.isRunning() && sps.isRunning()) { + try { + if (!namesystem.isInSafeMode()) { + FSDirectory fsd = namesystem.getFSDirectory(); + Long startINodeId = spsDirsToBeTraveresed.poll(); + if (startINodeId == null) { + // Waiting for SPS path + synchronized (spsDirsToBeTraveresed) { + spsDirsToBeTraveresed.wait(5000); + } + } else { + INode startInode = fsd.getInode(startINodeId); + if (startInode != null) { + try { + remainingCapacity = remainingCapacity(); + spsStatus.put(startINodeId, + new StoragePolicySatisfyPathStatusInfo( + StoragePolicySatisfyPathStatus.IN_PROGRESS)); + readLock(); + traverseDir(startInode.asDirectory(), startINodeId, + HdfsFileStatus.EMPTY_NAME, + new SPSTraverseInfo(startINodeId)); + } finally { + readUnlock(); + } + // Mark startInode traverse is done + addAll(startInode.getId(), currentBatch, true); + currentBatch.clear(); + + // check if directory was empty and no child added to queue + DirPendingWorkInfo dirPendingWorkInfo = + pendingWorkForDirectory.get(startInode.getId()); + if (dirPendingWorkInfo.isDirWorkDone()) { + namesystem.removeXattr(startInode.getId(), + XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(startInode.getId()); + updateStatus(startInode.getId(), true); + } + } + } + //Clear the SPS status if status is in SUCCESS more than 5 min. + if (Time.monotonicNow() + - lastStatusCleanTime > statusClearanceElapsedTimeMs) { + lastStatusCleanTime = Time.monotonicNow(); + cleanSpsStatus(); + } + } + } catch (Throwable t) { + LOG.warn("Exception while loading inodes to satisfy the policy", t); + } + } + } + + private synchronized void cleanSpsStatus() { + for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it = + spsStatus.entrySet().iterator(); it.hasNext();) { + Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next(); + if (entry.getValue().canRemove()) { + it.remove(); + } + } + } + + @Override + protected void checkPauseForTesting() throws InterruptedException { + // TODO implement if needed + } + + @Override + protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) + throws IOException, InterruptedException { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for statisy the policy", + inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + if (inode.isFile() && inode.asFile().numBlocks() != 0) { + currentBatch.add(new ItemInfo( + ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); + remainingCapacity--; + } + return true; + } + + @Override + protected boolean canSubmitCurrentBatch() { + return remainingCapacity <= 0; + } + + @Override + protected void checkINodeReady(long startId) throws IOException { + // SPS work won't be scheduled if NN is in standby. So, skipping NN + // standby check. + return; + } + + @Override + protected void submitCurrentBatch(long startId) + throws IOException, InterruptedException { + // Add current child's to queue + addAll(startId, currentBatch, false); + currentBatch.clear(); + } + + @Override + protected void throttle() throws InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("StorageMovementNeeded queue remaining capacity is zero," + + " waiting for some free slots."); + } + remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + Thread.sleep(5000); + remainingCapacity = remainingCapacity(); + } + } + + @Override + protected boolean canTraverseDir(INode inode) throws IOException { + return true; + } + } + + /** + * Info for directory recursive scan. + */ + public static class DirPendingWorkInfo { + + private int pendingWorkCount = 0; + private boolean fullyScanned = false; + private boolean success = true; + + /** + * Increment the pending work count for directory. + */ + public synchronized void addPendingWorkCount(int count) { + this.pendingWorkCount = this.pendingWorkCount + count; + } + + /** + * Decrement the pending work count for directory one track info is + * completed. + */ + public synchronized void decrementPendingWorkCount() { + this.pendingWorkCount--; + } + + /** + * Return true if all the pending work is done and directory fully + * scanned, otherwise false. + */ + public synchronized boolean isDirWorkDone() { + return (pendingWorkCount <= 0 && fullyScanned); + } + + /** + * Mark directory scan is completed. + */ + public synchronized void markScanCompleted() { + this.fullyScanned = true; + } + + /** + * Return true if all the files block movement is success, otherwise false. + */ + public boolean isPolicySatisfied() { + return success; + } + + /** + * Set directory SPS status failed. + */ + public void setFailure(boolean failure) { + this.success = this.success || failure; + } + } + + public void init() { + inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector( + namesystem.getFSDirectory())); + inodeIdCollector.setName("FileInodeIdCollector"); + inodeIdCollector.start(); + } + + public void close() { + if (inodeIdCollector != null) { + inodeIdCollector.interrupt(); + } + } + + class SPSTraverseInfo extends TraverseInfo { + private long startId; + + SPSTraverseInfo(long startId) { + this.startId = startId; + } + + public long getStartId() { + return startId; + } + } + + /** + * Represent the file/directory block movement status. + */ + static class StoragePolicySatisfyPathStatusInfo { + private StoragePolicySatisfyPathStatus status = + StoragePolicySatisfyPathStatus.NOT_AVAILABLE; + private long lastStatusUpdateTime; + + StoragePolicySatisfyPathStatusInfo() { + this.lastStatusUpdateTime = 0; + } + + StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) { + this.status = status; + this.lastStatusUpdateTime = 0; + } + + private void setSuccess() { + this.status = StoragePolicySatisfyPathStatus.SUCCESS; + this.lastStatusUpdateTime = Time.monotonicNow(); + } + + private void setFailure() { + this.status = StoragePolicySatisfyPathStatus.FAILURE; + this.lastStatusUpdateTime = Time.monotonicNow(); + } + + private StoragePolicySatisfyPathStatus getStatus() { + return status; + } + + /** + * Return true if SUCCESS status cached more then 5 min. + */ + private boolean canRemove() { + return (StoragePolicySatisfyPathStatus.SUCCESS == status + || StoragePolicySatisfyPathStatus.FAILURE == status) + && (Time.monotonicNow() + - lastStatusUpdateTime) > statusClearanceElapsedTimeMs; + } + } + + public StoragePolicySatisfyPathStatus getStatus(long id) { + StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id); + if(spsStatusInfo == null){ + return StoragePolicySatisfyPathStatus.NOT_AVAILABLE; + } + return spsStatusInfo.getStatus(); + } + + @VisibleForTesting + public static void setStatusClearanceElapsedTimeMs( + long statusClearanceElapsedTimeMs) { + BlockStorageMovementNeeded.statusClearanceElapsedTimeMs = + statusClearanceElapsedTimeMs; + } + + @VisibleForTesting + public static long getStatusClearanceElapsedTimeMs() { + return statusClearanceElapsedTimeMs; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d37661/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java new file mode 100644 index 0000000..0d4bb19 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -0,0 +1,988 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.sps; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; +import org.apache.hadoop.hdfs.server.balancer.Matcher; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Setting storagePolicy on a file after the file write will only update the new + * storage policy type in Namespace, but physical block storage movement will + * not happen until user runs "Mover Tool" explicitly for such files. The + * StoragePolicySatisfier Daemon thread implemented for addressing the case + * where users may want to physically move the blocks by HDFS itself instead of + * running mover tool explicitly. Just calling client API to + * satisfyStoragePolicy on a file/dir will automatically trigger to move its + * physical storage locations as expected in asynchronous manner. Here Namenode + * will pick the file blocks which are expecting to change its storages, then it + * will build the mapping of source block location and expected storage type and + * location to move. After that this class will also prepare commands to send to + * Datanode for processing the physical block movements. + */ +@InterfaceAudience.Private +public class StoragePolicySatisfier implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(StoragePolicySatisfier.class); + private Daemon storagePolicySatisfierThread; + private final Namesystem namesystem; + private final BlockManager blockManager; + private final BlockStorageMovementNeeded storageMovementNeeded; + private final BlockStorageMovementAttemptedItems storageMovementsMonitor; + private volatile boolean isRunning = false; + private int spsWorkMultiplier; + private long blockCount = 0L; + private int blockMovementMaxRetry; + private final Context ctxt; + + /** + * An interface for analyzing and assigning the block storage movements to + * worker nodes. + */ + // TODO: Now, added one API which is required for sps package. Will refine + // this interface via HDFS-12911. + public interface Context { + int getNumLiveDataNodes(); + } + + /** + * Represents the collective analysis status for all blocks. + */ + private static class BlocksMovingAnalysis { + + enum Status { + // Represents that, the analysis skipped due to some conditions. A such + // condition is if block collection is in incomplete state. + ANALYSIS_SKIPPED_FOR_RETRY, + // Represents that few or all blocks found respective target to do + // the storage movement. + BLOCKS_TARGETS_PAIRED, + // Represents that none of the blocks found respective target to do + // the storage movement. + NO_BLOCKS_TARGETS_PAIRED, + // Represents that, none of the blocks found for block storage movements. + BLOCKS_ALREADY_SATISFIED, + // Represents that, the analysis skipped due to some conditions. + // Example conditions are if no blocks really exists in block collection + // or + // if analysis is not required on ec files with unsuitable storage + // policies + BLOCKS_TARGET_PAIRING_SKIPPED, + // Represents that, All the reported blocks are satisfied the policy but + // some of the blocks are low redundant. + FEW_LOW_REDUNDANCY_BLOCKS + } + + private Status status = null; + private List<Block> assignedBlocks = null; + + BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) { + this.status = status; + this.assignedBlocks = blockMovingInfo; + } + } + + public StoragePolicySatisfier(final Namesystem namesystem, + final BlockManager blkManager, Configuration conf, Context ctxt) { + this.namesystem = namesystem; + this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, + this, conf.getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT)); + this.blockManager = blkManager; + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( + conf.getLong( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT), + conf.getLong( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT), + storageMovementNeeded); + this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf); + this.blockMovementMaxRetry = conf.getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); + this.ctxt = ctxt; + } + + /** + * Start storage policy satisfier demon thread. Also start block storage + * movements monitor for retry the attempts if needed. + */ + public synchronized void start(boolean reconfigStart) { + isRunning = true; + if (checkIfMoverRunning()) { + isRunning = false; + LOG.error( + "Stopping StoragePolicySatisfier thread " + "as Mover ID file " + + HdfsServerConstants.MOVER_ID_PATH.toString() + + " been opened. Maybe a Mover instance is running!"); + return; + } + if (reconfigStart) { + LOG.info("Starting StoragePolicySatisfier, as admin requested to " + + "start it."); + } else { + LOG.info("Starting StoragePolicySatisfier."); + } + + // Ensure that all the previously submitted block movements(if any) have to + // be stopped in all datanodes. + addDropSPSWorkCommandsToAllDNs(); + storageMovementNeeded.init(); + storagePolicySatisfierThread = new Daemon(this); + storagePolicySatisfierThread.setName("StoragePolicySatisfier"); + storagePolicySatisfierThread.start(); + this.storageMovementsMonitor.start(); + } + + /** + * Disables storage policy satisfier by stopping its services. + * + * @param forceStop + * true represents that it should stop SPS service by clearing all + * pending SPS work + */ + public synchronized void disable(boolean forceStop) { + isRunning = false; + + if (storagePolicySatisfierThread == null) { + return; + } + + storageMovementNeeded.close(); + + storagePolicySatisfierThread.interrupt(); + this.storageMovementsMonitor.stop(); + if (forceStop) { + storageMovementNeeded.clearQueuesWithNotification(); + addDropSPSWorkCommandsToAllDNs(); + } else { + LOG.info("Stopping StoragePolicySatisfier."); + } + } + + /** + * Timed wait to stop storage policy satisfier daemon threads. + */ + public synchronized void stopGracefully() { + if (isRunning) { + disable(true); + } + this.storageMovementsMonitor.stopGracefully(); + + if (storagePolicySatisfierThread == null) { + return; + } + try { + storagePolicySatisfierThread.join(3000); + } catch (InterruptedException ie) { + } + } + + /** + * Check whether StoragePolicySatisfier is running. + * @return true if running + */ + public boolean isRunning() { + return isRunning; + } + + // Return true if a Mover instance is running + private boolean checkIfMoverRunning() { + String moverId = HdfsServerConstants.MOVER_ID_PATH.toString(); + return namesystem.isFileOpenedForWrite(moverId); + } + + /** + * Adding drop commands to all datanodes to stop performing the satisfier + * block movements, if any. + */ + private void addDropSPSWorkCommandsToAllDNs() { + this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + } + + @Override + public void run() { + while (namesystem.isRunning() && isRunning) { + try { + if (!namesystem.isInSafeMode()) { + ItemInfo itemInfo = storageMovementNeeded.get(); + if (itemInfo != null) { + if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ + LOG.info("Failed to satisfy the policy after " + + blockMovementMaxRetry + " retries. Removing inode " + + itemInfo.getTrackId() + " from the queue"); + storageMovementNeeded.removeItemTrackInfo(itemInfo, false); + continue; + } + long trackId = itemInfo.getTrackId(); + BlockCollection blockCollection; + BlocksMovingAnalysis status = null; + try { + namesystem.readLock(); + blockCollection = namesystem.getBlockCollection(trackId); + // Check blockCollectionId existence. + if (blockCollection == null) { + // File doesn't exists (maybe got deleted), remove trackId from + // the queue + storageMovementNeeded.removeItemTrackInfo(itemInfo, true); + } else { + status = + analyseBlocksStorageMovementsAndAssignToDN( + blockCollection); + } + } finally { + namesystem.readUnlock(); + } + if (blockCollection != null) { + switch (status.status) { + // Just add to monitor, so it will be retried after timeout + case ANALYSIS_SKIPPED_FOR_RETRY: + // Just add to monitor, so it will be tracked for report and + // be removed on storage movement attempt finished report. + case BLOCKS_TARGETS_PAIRED: + this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo + .getStartId(), itemInfo.getTrackId(), monotonicNow(), + status.assignedBlocks, itemInfo.getRetryCount())); + break; + case NO_BLOCKS_TARGETS_PAIRED: + if (LOG.isDebugEnabled()) { + LOG.debug("Adding trackID " + trackId + + " back to retry queue as none of the blocks" + + " found its eligible targets."); + } + itemInfo.retryCount++; + this.storageMovementNeeded.add(itemInfo); + break; + case FEW_LOW_REDUNDANCY_BLOCKS: + if (LOG.isDebugEnabled()) { + LOG.debug("Adding trackID " + trackId + + " back to retry queue as some of the blocks" + + " are low redundant."); + } + this.storageMovementNeeded.add(itemInfo); + break; + // Just clean Xattrs + case BLOCKS_TARGET_PAIRING_SKIPPED: + case BLOCKS_ALREADY_SATISFIED: + default: + LOG.info("Block analysis skipped or blocks already satisfied" + + " with storages. So, Cleaning up the Xattrs."); + storageMovementNeeded.removeItemTrackInfo(itemInfo, true); + break; + } + } + } + } + int numLiveDn = ctxt.getNumLiveDataNodes(); + if (storageMovementNeeded.size() == 0 + || blockCount > (numLiveDn * spsWorkMultiplier)) { + Thread.sleep(3000); + blockCount = 0L; + } + } catch (Throwable t) { + handleException(t); + } + } + } + + private void handleException(Throwable t) { + // double check to avoid entering into synchronized block. + if (isRunning) { + synchronized (this) { + if (isRunning) { + isRunning = false; + // Stopping monitor thread and clearing queues as well + this.clearQueues(); + this.storageMovementsMonitor.stopGracefully(); + if (!namesystem.isRunning()) { + LOG.info("Stopping StoragePolicySatisfier."); + if (!(t instanceof InterruptedException)) { + LOG.info("StoragePolicySatisfier received an exception" + + " while shutting down.", t); + } + return; + } + } + } + } + LOG.error("StoragePolicySatisfier thread received runtime exception. " + + "Stopping Storage policy satisfier work", t); + return; + } + + private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( + BlockCollection blockCollection) { + BlocksMovingAnalysis.Status status = + BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED; + byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); + BlockStoragePolicy existingStoragePolicy = + blockManager.getStoragePolicy(existingStoragePolicyID); + if (!blockCollection.getLastBlock().isComplete()) { + // Postpone, currently file is under construction + // So, should we add back? or leave it to user + LOG.info("BlockCollectionID: {} file is under construction. So, postpone" + + " this to the next retry iteration", blockCollection.getId()); + return new BlocksMovingAnalysis( + BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY, + new ArrayList<>()); + } + + BlockInfo[] blocks = blockCollection.getBlocks(); + if (blocks.length == 0) { + LOG.info("BlockCollectionID: {} file is not having any blocks." + + " So, skipping the analysis.", blockCollection.getId()); + return new BlocksMovingAnalysis( + BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, + new ArrayList<>()); + } + List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>(); + + for (int i = 0; i < blocks.length; i++) { + BlockInfo blockInfo = blocks[i]; + List<StorageType> expectedStorageTypes; + if (blockInfo.isStriped()) { + if (ErasureCodingPolicyManager + .checkStoragePolicySuitableForECStripedMode( + existingStoragePolicyID)) { + expectedStorageTypes = existingStoragePolicy + .chooseStorageTypes((short) blockInfo.getCapacity()); + } else { + // Currently we support only limited policies (HOT, COLD, ALLSSD) + // for EC striped mode files. SPS will ignore to move the blocks if + // the storage policy is not in EC Striped mode supported policies + LOG.warn("The storage policy " + existingStoragePolicy.getName() + + " is not suitable for Striped EC files. " + + "So, ignoring to move the blocks"); + return new BlocksMovingAnalysis( + BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, + new ArrayList<>()); + } + } else { + expectedStorageTypes = existingStoragePolicy + .chooseStorageTypes(blockInfo.getReplication()); + } + + DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); + StorageType[] storageTypes = new StorageType[storages.length]; + for (int j = 0; j < storages.length; j++) { + DatanodeStorageInfo datanodeStorageInfo = storages[j]; + StorageType storageType = datanodeStorageInfo.getStorageType(); + storageTypes[j] = storageType; + } + List<StorageType> existing = + new LinkedList<StorageType>(Arrays.asList(storageTypes)); + if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + existing, true)) { + boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos, + blockInfo, expectedStorageTypes, existing, storages); + if (blocksPaired) { + status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED; + } else { + // none of the blocks found its eligible targets for satisfying the + // storage policy. + status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; + } + } else { + if (blockManager.hasLowRedundancyBlocks(blockCollection)) { + status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; + } + } + } + + List<Block> assignedBlockIds = new ArrayList<Block>(); + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + // Check for at least one block storage movement has been chosen + if (blkMovingInfo.getTarget() != null) { + // assign block storage movement task to the target node + ((DatanodeDescriptor) blkMovingInfo.getTarget()) + .addBlocksToMoveStorage(blkMovingInfo); + LOG.debug("BlockMovingInfo: {}", blkMovingInfo); + assignedBlockIds.add(blkMovingInfo.getBlock()); + blockCount++; + } + } + return new BlocksMovingAnalysis(status, assignedBlockIds); + } + + /** + * Compute the list of block moving information corresponding to the given + * blockId. This will check that each block location of the given block is + * satisfying the expected storage policy. If block location is not satisfied + * the policy then find out the target node with the expected storage type to + * satisfy the storage policy. + * + * @param blockMovingInfos + * - list of block source and target node pair + * @param blockInfo + * - block details + * @param expectedStorageTypes + * - list of expected storage type to satisfy the storage policy + * @param existing + * - list to get existing storage types + * @param storages + * - available storages + * @return false if some of the block locations failed to find target node to + * satisfy the storage policy, true otherwise + */ + private boolean computeBlockMovingInfos( + List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo, + List<StorageType> expectedStorageTypes, List<StorageType> existing, + DatanodeStorageInfo[] storages) { + boolean foundMatchingTargetNodesForBlock = true; + if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + existing, true)) { + List<StorageTypeNodePair> sourceWithStorageMap = + new ArrayList<StorageTypeNodePair>(); + List<DatanodeStorageInfo> existingBlockStorages = + new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages)); + // if expected type exists in source node already, local movement would be + // possible, so lets find such sources first. + Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + if (checkSourceAndTargetTypeExists( + datanodeStorageInfo.getDatanodeDescriptor(), existing, + expectedStorageTypes)) { + sourceWithStorageMap + .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(), + datanodeStorageInfo.getDatanodeDescriptor())); + iterator.remove(); + existing.remove(datanodeStorageInfo.getStorageType()); + } + } + + // Let's find sources for existing types left. + for (StorageType existingType : existing) { + iterator = existingBlockStorages.iterator(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + StorageType storageType = datanodeStorageInfo.getStorageType(); + if (storageType == existingType) { + iterator.remove(); + sourceWithStorageMap.add(new StorageTypeNodePair(storageType, + datanodeStorageInfo.getDatanodeDescriptor())); + break; + } + } + } + + StorageTypeNodeMap locsForExpectedStorageTypes = + findTargetsForExpectedStorageTypes(expectedStorageTypes); + + foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove( + blockMovingInfos, blockInfo, sourceWithStorageMap, + expectedStorageTypes, locsForExpectedStorageTypes); + } + return foundMatchingTargetNodesForBlock; + } + + /** + * Find the good target node for each source node for which block storages was + * misplaced. + * + * @param blockMovingInfos + * - list of block source and target node pair + * @param blockInfo + * - Block + * @param sourceWithStorageList + * - Source Datanode with storages list + * @param expected + * - Expecting storages to move + * @param locsForExpectedStorageTypes + * - Available DNs for expected storage types + * @return false if some of the block locations failed to find target node to + * satisfy the storage policy + */ + private boolean findSourceAndTargetToMove( + List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo, + List<StorageTypeNodePair> sourceWithStorageList, + List<StorageType> expected, + StorageTypeNodeMap locsForExpectedStorageTypes) { + boolean foundMatchingTargetNodesForBlock = true; + List<DatanodeDescriptor> excludeNodes = new ArrayList<>(); + + // Looping over all the source node locations and choose the target + // storage within same node if possible. This is done separately to + // avoid choosing a target which already has this block. + for (int i = 0; i < sourceWithStorageList.size(); i++) { + StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); + + // Check whether the block replica is already placed in the expected + // storage type in this source datanode. + if (!expected.contains(existingTypeNodePair.storageType)) { + StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( + blockInfo, existingTypeNodePair.dn, expected); + if (chosenTarget != null) { + if (blockInfo.isStriped()) { + buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn, + existingTypeNodePair.storageType, chosenTarget.dn, + chosenTarget.storageType, blockMovingInfos); + } else { + buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn, + existingTypeNodePair.storageType, chosenTarget.dn, + chosenTarget.storageType, blockMovingInfos); + } + expected.remove(chosenTarget.storageType); + // TODO: We can increment scheduled block count for this node? + } + } + // To avoid choosing this excludeNodes as targets later + excludeNodes.add(existingTypeNodePair.dn); + } + + // Looping over all the source node locations. Choose a remote target + // storage node if it was not found out within same node. + for (int i = 0; i < sourceWithStorageList.size(); i++) { + StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); + StorageTypeNodePair chosenTarget = null; + // Chosen the target storage within same datanode. So just skipping this + // source node. + if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) { + continue; + } + if (chosenTarget == null && blockManager.getDatanodeManager() + .getNetworkTopology().isNodeGroupAware()) { + chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, + expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes, + excludeNodes); + } + + // Then, match nodes on the same rack + if (chosenTarget == null) { + chosenTarget = + chooseTarget(blockInfo, existingTypeNodePair.dn, expected, + Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes); + } + + if (chosenTarget == null) { + chosenTarget = + chooseTarget(blockInfo, existingTypeNodePair.dn, expected, + Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes); + } + if (null != chosenTarget) { + if (blockInfo.isStriped()) { + buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn, + existingTypeNodePair.storageType, chosenTarget.dn, + chosenTarget.storageType, blockMovingInfos); + } else { + buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn, + existingTypeNodePair.storageType, chosenTarget.dn, + chosenTarget.storageType, blockMovingInfos); + } + + expected.remove(chosenTarget.storageType); + excludeNodes.add(chosenTarget.dn); + // TODO: We can increment scheduled block count for this node? + } else { + LOG.warn( + "Failed to choose target datanode for the required" + + " storage types {}, block:{}, existing storage type:{}", + expected, blockInfo, existingTypeNodePair.storageType); + } + } + + if (expected.size() > 0) { + foundMatchingTargetNodesForBlock = false; + } + + return foundMatchingTargetNodesForBlock; + } + + private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos, + DatanodeDescriptor dn) { + for (BlockMovingInfo blockMovingInfo : blockMovingInfos) { + if (blockMovingInfo.getSource().equals(dn)) { + return true; + } + } + return false; + } + + private void buildContinuousBlockMovingInfos(BlockInfo blockInfo, + DatanodeInfo sourceNode, StorageType sourceStorageType, + DatanodeInfo targetNode, StorageType targetStorageType, + List<BlockMovingInfo> blkMovingInfos) { + Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(), + blockInfo.getGenerationStamp()); + BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode, + targetNode, sourceStorageType, targetStorageType); + blkMovingInfos.add(blkMovingInfo); + } + + private void buildStripedBlockMovingInfos(BlockInfo blockInfo, + DatanodeInfo sourceNode, StorageType sourceStorageType, + DatanodeInfo targetNode, StorageType targetStorageType, + List<BlockMovingInfo> blkMovingInfos) { + // For a striped block, it needs to construct internal block at the given + // index of a block group. Here it is iterating over all the block indices + // and construct internal blocks which can be then considered for block + // movement. + BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo; + for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) { + if (si.getBlockIndex() >= 0) { + DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor(); + if (sourceNode.equals(dn)) { + // construct internal block + long blockId = blockInfo.getBlockId() + si.getBlockIndex(); + long numBytes = StripedBlockUtil.getInternalBlockLength( + sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(), + sBlockInfo.getDataBlockNum(), si.getBlockIndex()); + Block blk = new Block(blockId, numBytes, + blockInfo.getGenerationStamp()); + BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode, + targetNode, sourceStorageType, targetStorageType); + blkMovingInfos.add(blkMovingInfo); + } + } + } + } + + /** + * Choose the target storage within same datanode if possible. + * + * @param block + * - block info + * @param source + * - source datanode + * @param targetTypes + * - list of target storage types + */ + private StorageTypeNodePair chooseTargetTypeInSameNode(Block block, + DatanodeDescriptor source, List<StorageType> targetTypes) { + for (StorageType t : targetTypes) { + DatanodeStorageInfo chooseStorage4Block = + source.chooseStorage4Block(t, block.getNumBytes()); + if (chooseStorage4Block != null) { + return new StorageTypeNodePair(t, source); + } + } + return null; + } + + private StorageTypeNodePair chooseTarget(Block block, + DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher, + StorageTypeNodeMap locsForExpectedStorageTypes, + List<DatanodeDescriptor> excludeNodes) { + for (StorageType t : targetTypes) { + List<DatanodeDescriptor> nodesWithStorages = + locsForExpectedStorageTypes.getNodesWithStorages(t); + if (nodesWithStorages == null || nodesWithStorages.isEmpty()) { + continue; // no target nodes with the required storage type. + } + Collections.shuffle(nodesWithStorages); + for (DatanodeDescriptor target : nodesWithStorages) { + if (!excludeNodes.contains(target) && matcher.match( + blockManager.getDatanodeManager().getNetworkTopology(), source, + target)) { + if (null != target.chooseStorage4Block(t, block.getNumBytes())) { + return new StorageTypeNodePair(t, target); + } + } + } + } + return null; + } + + private static class StorageTypeNodePair { + private StorageType storageType = null; + private DatanodeDescriptor dn = null; + + StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) { + this.storageType = storageType; + this.dn = dn; + } + } + + private StorageTypeNodeMap findTargetsForExpectedStorageTypes( + List<StorageType> expected) { + StorageTypeNodeMap targetMap = new StorageTypeNodeMap(); + List<DatanodeDescriptor> reports = blockManager.getDatanodeManager() + .getDatanodeListForReport(DatanodeReportType.LIVE); + for (DatanodeDescriptor dn : reports) { + StorageReport[] storageReports = dn.getStorageReports(); + for (StorageReport storageReport : storageReports) { + StorageType t = storageReport.getStorage().getStorageType(); + if (expected.contains(t)) { + final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t); + if (maxRemaining > 0L) { + targetMap.add(t, dn); + } + } + } + } + return targetMap; + } + + private static long getMaxRemaining(StorageReport[] storageReports, + StorageType t) { + long max = 0L; + for (StorageReport r : storageReports) { + if (r.getStorage().getStorageType() == t) { + if (r.getRemaining() > max) { + max = r.getRemaining(); + } + } + } + return max; + } + + private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn, + List<StorageType> existing, List<StorageType> expectedStorageTypes) { + DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos(); + boolean isExpectedTypeAvailable = false; + boolean isExistingTypeAvailable = false; + for (DatanodeStorageInfo dnInfo : allDNStorageInfos) { + StorageType storageType = dnInfo.getStorageType(); + if (existing.contains(storageType)) { + isExistingTypeAvailable = true; + } + if (expectedStorageTypes.contains(storageType)) { + isExpectedTypeAvailable = true; + } + } + return isExistingTypeAvailable && isExpectedTypeAvailable; + } + + private static class StorageTypeNodeMap { + private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap = + new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class); + + private void add(StorageType t, DatanodeDescriptor dn) { + List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t); + LinkedList<DatanodeDescriptor> value = null; + if (nodesWithStorages == null) { + value = new LinkedList<DatanodeDescriptor>(); + value.add(dn); + typeNodeMap.put(t, value); + } else { + nodesWithStorages.add(dn); + } + } + + /** + * @param type + * - Storage type + * @return datanodes which has the given storage type + */ + private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) { + return typeNodeMap.get(type); + } + } + + /** + * Receives set of storage movement attempt finished blocks report. + * + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks. + */ + public void handleStorageMovementAttemptFinishedBlks( + BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) { + if (moveAttemptFinishedBlks.getBlocks().length <= 0) { + return; + } + storageMovementsMonitor + .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks()); + } + + @VisibleForTesting + BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { + return storageMovementsMonitor; + } + + /** + * Clear the queues from to be storage movement needed lists and items tracked + * in storage movement monitor. + */ + public void clearQueues() { + LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, " + + "user requests on satisfying block storages would be discarded."); + storageMovementNeeded.clearAll(); + } + + /** + * Set file inode in queue for which storage movement needed for its blocks. + * + * @param inodeId + * - file inode/blockcollection id. + */ + public void satisfyStoragePolicy(Long inodeId) { + //For file startId and trackId is same + storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); + if (LOG.isDebugEnabled()) { + LOG.debug("Added track info for inode {} to block " + + "storageMovementNeeded queue", inodeId); + } + } + + public void addInodeToPendingDirQueue(long id) { + storageMovementNeeded.addToPendingDirQueue(id); + } + + /** + * Clear queues for given track id. + */ + public void clearQueue(long trackId) { + storageMovementNeeded.clearQueue(trackId); + } + + /** + * ItemInfo is a file info object for which need to satisfy the + * policy. + */ + public static class ItemInfo { + private long startId; + private long trackId; + private int retryCount; + + public ItemInfo(long startId, long trackId) { + this.startId = startId; + this.trackId = trackId; + //set 0 when item is getting added first time in queue. + this.retryCount = 0; + } + + public ItemInfo(long startId, long trackId, int retryCount) { + this.startId = startId; + this.trackId = trackId; + this.retryCount = retryCount; + } + + /** + * Return the start inode id of the current track Id. + */ + public long getStartId() { + return startId; + } + + /** + * Return the File inode Id for which needs to satisfy the policy. + */ + public long getTrackId() { + return trackId; + } + + /** + * Returns true if the tracking path is a directory, false otherwise. + */ + public boolean isDir() { + return (startId != trackId); + } + + /** + * Get the attempted retry count of the block for satisfy the policy. + */ + public int getRetryCount() { + return retryCount; + } + } + + /** + * This class contains information of an attempted blocks and its last + * attempted or reported time stamp. This is used by + * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. + */ + final static class AttemptedItemInfo extends ItemInfo { + private long lastAttemptedOrReportedTime; + private final List<Block> blocks; + + /** + * AttemptedItemInfo constructor. + * + * @param rootId + * rootId for trackId + * @param trackId + * trackId for file. + * @param lastAttemptedOrReportedTime + * last attempted or reported time + */ + AttemptedItemInfo(long rootId, long trackId, + long lastAttemptedOrReportedTime, + List<Block> blocks, int retryCount) { + super(rootId, trackId, retryCount); + this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; + this.blocks = blocks; + } + + /** + * @return last attempted or reported time stamp. + */ + long getLastAttemptedOrReportedTime() { + return lastAttemptedOrReportedTime; + } + + /** + * Update lastAttemptedOrReportedTime, so that the expiration time will be + * postponed to future. + */ + void touchLastReportedTimeStamp() { + this.lastAttemptedOrReportedTime = monotonicNow(); + } + + List<Block> getBlocks() { + return this.blocks; + } + + } + + public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( + String path) throws IOException { + INode inode = namesystem.getFSDirectory().getINode(path); + return storageMovementNeeded.getStatus(inode.getId()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d37661/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java new file mode 100644 index 0000000..d1d69fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides a mechanism for satisfying the storage policy of a + * path. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.hdfs.server.namenode.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d37661/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java deleted file mode 100644 index d4ccb3e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.util.Time.monotonicNow; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; - -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Tests that block storage movement attempt failures are reported from DN and - * processed them correctly or not. - */ -public class TestBlockStorageMovementAttemptedItems { - - private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; - private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; - private final int selfRetryTimeout = 500; - - @Before - public void setup() throws Exception { - unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( - Mockito.mock(Namesystem.class), - Mockito.mock(StoragePolicySatisfier.class), 100); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, - selfRetryTimeout, unsatisfiedStorageMovementFiles); - } - - @After - public void teardown() { - if (bsmAttemptedItems != null) { - bsmAttemptedItems.stop(); - bsmAttemptedItems.stopGracefully(); - } - } - - private boolean checkItemMovedForRetry(Long item, long retryTimeout) - throws InterruptedException { - long stopTime = monotonicNow() + (retryTimeout * 2); - boolean isItemFound = false; - while (monotonicNow() < (stopTime)) { - ItemInfo ele = null; - while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { - if (item == ele.getTrackId()) { - isItemFound = true; - break; - } - } - if (!isItemFound) { - Thread.sleep(100); - } else { - break; - } - } - return isItemFound; - } - - /** - * Verify that moved blocks reporting should queued up the block info. - */ - @Test(timeout = 30000) - public void testAddReportedMoveAttemptFinishedBlocks() throws Exception { - bsmAttemptedItems.start(); // start block movement result monitor thread - Long item = new Long(1234); - List<Block> blocks = new ArrayList<Block>(); - blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); - Block[] blockArray = new Block[blocks.size()]; - blocks.toArray(blockArray); - bsmAttemptedItems.addReportedMovedBlocks(blockArray); - assertEquals("Failed to receive result!", 1, - bsmAttemptedItems.getMovementFinishedBlocksCount()); - } - - /** - * Verify empty moved blocks reporting queue. - */ - @Test(timeout = 30000) - public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception { - bsmAttemptedItems.start(); // start block movement report monitor thread - Long item = new Long(1234); - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); - assertEquals("Shouldn't receive result", 0, - bsmAttemptedItems.getMovementFinishedBlocksCount()); - assertEquals("Item doesn't exist in the attempted list", 1, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement with - * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence - * is #blockStorageMovementReportedItemsCheck() and then - * #blocksStorageMovementUnReportedItemsCheck(). - */ - @Test(timeout = 30000) - public void testPartialBlockMovementShouldBeRetried1() throws Exception { - Long item = new Long(1234); - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - blocks.add(new Block(5678L)); - Long trackID = 0L; - bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); - - // start block movement report monitor thread - bsmAttemptedItems.start(); - assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(trackID, 5000)); - assertEquals("Failed to remove from the attempted list", 0, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement. Here, first occurrence is - * #blocksStorageMovementUnReportedItemsCheck() and then - * #blockStorageMovementReportedItemsCheck(). - */ - @Test(timeout = 30000) - public void testPartialBlockMovementShouldBeRetried2() throws Exception { - Long item = new Long(1234); - Long trackID = 0L; - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); - - Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out - - bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck(); - bsmAttemptedItems.blockStorageMovementReportedItemsCheck(); - - assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(trackID, 5000)); - assertEquals("Failed to remove from the attempted list", 0, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement with only BlocksStorageMoveAttemptFinished report - * and storageMovementAttemptedItems list is empty. - */ - @Test(timeout = 30000) - public void testPartialBlockMovementWithEmptyAttemptedQueue() - throws Exception { - Long item = new Long(1234); - Long trackID = 0L; - List<Block> blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); - assertFalse( - "Should not add in queue again if it is not there in" - + " storageMovementAttemptedItems", - checkItemMovedForRetry(trackID, 5000)); - assertEquals("Failed to remove from the attempted list", 1, - bsmAttemptedItems.getAttemptedItemsCount()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org