HDFS-13033: [SPS]: Implement a mechanism to do file block movements for external SPS. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b0cb8d9b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b0cb8d9b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b0cb8d9b Branch: refs/heads/HDFS-10285 Commit: b0cb8d9bb44c963ae686d2b5c1b70bc76b955e10 Parents: 3159b39 Author: Uma Maheswara Rao G <uma.ganguma...@intel.com> Authored: Tue Jan 23 16:19:46 2018 -0800 Committer: Uma Maheswara Rao Gangumalla <umamah...@apache.org> Committed: Sun Aug 12 03:06:03 2018 -0700 ---------------------------------------------------------------------- .../hdfs/server/balancer/NameNodeConnector.java | 8 + .../hdfs/server/common/sps/BlockDispatcher.java | 186 +++++++++++++ .../sps/BlockMovementAttemptFinished.java | 80 ++++++ .../server/common/sps/BlockMovementStatus.java | 53 ++++ .../common/sps/BlockStorageMovementTracker.java | 184 +++++++++++++ .../sps/BlocksMovementsStatusHandler.java | 95 +++++++ .../hdfs/server/common/sps/package-info.java | 27 ++ .../datanode/BlockStorageMovementTracker.java | 186 ------------- .../datanode/StoragePolicySatisfyWorker.java | 271 ++----------------- .../hdfs/server/namenode/FSNamesystem.java | 4 +- .../namenode/sps/BlockMoveTaskHandler.java | 3 +- .../sps/BlockStorageMovementAttemptedItems.java | 12 +- .../IntraSPSNameNodeBlockMoveTaskHandler.java | 3 +- .../hdfs/server/namenode/sps/SPSService.java | 14 +- .../namenode/sps/StoragePolicySatisfier.java | 30 +- .../sps/ExternalSPSBlockMoveTaskHandler.java | 233 ++++++++++++++++ .../TestBlockStorageMovementAttemptedItems.java | 2 +- .../sps/TestStoragePolicySatisfier.java | 6 +- .../sps/TestExternalStoragePolicySatisfier.java | 69 ++++- 19 files changed, 997 insertions(+), 469 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index b0dd779..6bfbbb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -269,6 +269,14 @@ public class NameNodeConnector implements Closeable { } } + /** + * Returns fallbackToSimpleAuth. This will be true or false during calls to + * indicate if a secure client falls back to simple auth. + */ + public AtomicBoolean getFallbackToSimpleAuth() { + return fallbackToSimpleAuth; + } + @Override public void close() { keyManager.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java new file mode 100644 index 0000000..f87fcae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common.sps; + +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dispatching block replica moves between datanodes to satisfy the storage + * policy. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockDispatcher { + private static final Logger LOG = LoggerFactory + .getLogger(BlockDispatcher.class); + + private final boolean connectToDnViaHostname; + private final int socketTimeout; + private final int ioFileBufferSize; + + /** + * Construct block dispatcher details. + * + * @param sockTimeout + * soTimeout + * @param ioFileBuffSize + * file io buffer size + * @param connectToDatanodeViaHostname + * true represents connect via hostname, false otw + */ + public BlockDispatcher(int sockTimeout, int ioFileBuffSize, + boolean connectToDatanodeViaHostname) { + this.socketTimeout = sockTimeout; + this.ioFileBufferSize = ioFileBuffSize; + this.connectToDnViaHostname = connectToDatanodeViaHostname; + } + + /** + * Moves the given block replica to the given target node and wait for the + * response. + * + * @param blkMovingInfo + * block to storage info + * @param saslClient + * SASL for DataTransferProtocol on behalf of a client + * @param eb + * extended block info + * @param sock + * target node's socket + * @param km + * for creation of an encryption key + * @param accessToken + * connection block access token + * @return status of the block movement + */ + public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo, + SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock, + DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) { + LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " + + "storageType, sourceStoragetype:{} and destinStoragetype:{}", + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getSourceStorageType(), + blkMovingInfo.getTargetStorageType()); + DataOutputStream out = null; + DataInputStream in = null; + try { + NetUtils.connect(sock, + NetUtils.createSocketAddr( + blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)), + socketTimeout); + // Set read timeout so that it doesn't hang forever against + // unresponsive nodes. Datanode normally sends IN_PROGRESS response + // twice within the client read timeout period (every 30 seconds by + // default). Here, we make it give up after "socketTimeout * 5" period + // of no response. + sock.setSoTimeout(socketTimeout * 5); + sock.setKeepAlive(true); + OutputStream unbufOut = sock.getOutputStream(); + InputStream unbufIn = sock.getInputStream(); + LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget()); + + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, km, accessToken, blkMovingInfo.getTarget()); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream( + new BufferedOutputStream(unbufOut, ioFileBufferSize)); + in = new DataInputStream( + new BufferedInputStream(unbufIn, ioFileBufferSize)); + sendRequest(out, eb, accessToken, blkMovingInfo.getSource(), + blkMovingInfo.getTargetStorageType()); + receiveResponse(in); + + LOG.info( + "Successfully moved block:{} from src:{} to destin:{} for" + + " satisfying storageType:{}", + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType()); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; + } catch (BlockPinningException e) { + // Pinned block won't be able to move to a different node. So, its not + // required to do retries, just marked as SUCCESS. + LOG.debug("Pinned block can't be moved, so skipping block:{}", + blkMovingInfo.getBlock(), e); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; + } catch (IOException e) { + // TODO: handle failure retries + LOG.warn( + "Failed to move block:{} from src:{} to destin:{} to satisfy " + + "storageType:{}", + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeSocket(sock); + } + } + + /** Send a reportedBlock replace request to the output stream. */ + private static void sendRequest(DataOutputStream out, ExtendedBlock eb, + Token<BlockTokenIdentifier> accessToken, DatanodeInfo source, + StorageType targetStorageType) throws IOException { + new Sender(out).replaceBlock(eb, targetStorageType, accessToken, + source.getDatanodeUuid(), source, null); + } + + /** Receive a reportedBlock copy response from the input stream. */ + private static void receiveResponse(DataInputStream in) throws IOException { + BlockOpResponseProto response = BlockOpResponseProto + .parseFrom(vintPrefixed(in)); + while (response.getStatus() == Status.IN_PROGRESS) { + // read intermediate responses + response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + } + String logInfo = "reportedBlock move is failed"; + DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java new file mode 100644 index 0000000..419d806 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +/** + * This class represents status from a block movement task. This will have the + * information of the task which was successful or failed due to errors. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockMovementAttemptFinished { + private final Block block; + private final DatanodeInfo src; + private final DatanodeInfo target; + private final BlockMovementStatus status; + + /** + * Construct movement attempt finished info. + * + * @param block + * block + * @param src + * src datanode + * @param target + * target datanode + * @param status + * movement status + */ + public BlockMovementAttemptFinished(Block block, DatanodeInfo src, + DatanodeInfo target, BlockMovementStatus status) { + this.block = block; + this.src = src; + this.target = target; + this.status = status; + } + + /** + * @return details of the block, which attempted to move from src to target + * node. + */ + public Block getBlock() { + return block; + } + + /** + * @return block movement status code. + */ + public BlockMovementStatus getStatus() { + return status; + } + + @Override + public String toString() { + return new StringBuilder().append("Block movement attempt finished(\n ") + .append(" block : ").append(block).append(" src node: ").append(src) + .append(" target node: ").append(target).append(" movement status: ") + .append(status).append(")").toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java new file mode 100644 index 0000000..f70d84f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementStatus.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Block movement status code. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum BlockMovementStatus { + /** Success. */ + DN_BLK_STORAGE_MOVEMENT_SUCCESS(0), + /** + * Failure due to generation time stamp mismatches or network errors + * or no available space. + */ + DN_BLK_STORAGE_MOVEMENT_FAILURE(-1); + + // TODO: need to support different type of failures. Failure due to network + // errors, block pinned, no space available etc. + + private final int code; + + BlockMovementStatus(int code) { + this.code = code; + } + + /** + * @return the status code. + */ + int getStatusCode() { + return code; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java new file mode 100644 index 0000000..b20d6cf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common.sps; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.Block; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to track the completion of block movement future tasks. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockStorageMovementTracker implements Runnable { + private static final Logger LOG = LoggerFactory + .getLogger(BlockStorageMovementTracker.class); + private final CompletionService<BlockMovementAttemptFinished> moverCompletionService; + private final BlocksMovementsStatusHandler blksMovementsStatusHandler; + + // Keeps the information - block vs its list of future move tasks + private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures; + private final Map<Block, List<BlockMovementAttemptFinished>> movementResults; + + private volatile boolean running = true; + + /** + * BlockStorageMovementTracker constructor. + * + * @param moverCompletionService + * completion service. + * @param handler + * blocks movements status handler + */ + public BlockStorageMovementTracker( + CompletionService<BlockMovementAttemptFinished> moverCompletionService, + BlocksMovementsStatusHandler handler) { + this.moverCompletionService = moverCompletionService; + this.moverTaskFutures = new HashMap<>(); + this.blksMovementsStatusHandler = handler; + this.movementResults = new HashMap<>(); + } + + @Override + public void run() { + while (running) { + if (moverTaskFutures.size() <= 0) { + try { + synchronized (moverTaskFutures) { + // Waiting for mover tasks. + moverTaskFutures.wait(2000); + } + } catch (InterruptedException ignore) { + // Sets interrupt flag of this thread. + Thread.currentThread().interrupt(); + } + } + try { + Future<BlockMovementAttemptFinished> future = + moverCompletionService.take(); + if (future != null) { + BlockMovementAttemptFinished result = future.get(); + LOG.debug("Completed block movement. {}", result); + Block block = result.getBlock(); + List<Future<BlockMovementAttemptFinished>> blocksMoving = + moverTaskFutures.get(block); + if (blocksMoving == null) { + LOG.warn("Future task doesn't exist for block : {} ", block); + continue; + } + blocksMoving.remove(future); + + List<BlockMovementAttemptFinished> resultPerTrackIdList = + addMovementResultToBlockIdList(result); + + // Completed all the scheduled blocks movement under this 'trackId'. + if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) { + synchronized (moverTaskFutures) { + moverTaskFutures.remove(block); + } + if (running) { + // handle completed or inprogress blocks movements per trackId. + blksMovementsStatusHandler.handle(resultPerTrackIdList); + } + movementResults.remove(block); + } + } + } catch (InterruptedException e) { + if (running) { + LOG.error("Exception while moving block replica to target storage" + + " type", e); + } + } catch (ExecutionException e) { + // TODO: Do we need failure retries and implement the same if required. + LOG.error("Exception while moving block replica to target storage type", + e); + } + } + } + + private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList( + BlockMovementAttemptFinished result) { + Block block = result.getBlock(); + List<BlockMovementAttemptFinished> perBlockIdList; + synchronized (movementResults) { + perBlockIdList = movementResults.get(block); + if (perBlockIdList == null) { + perBlockIdList = new ArrayList<>(); + movementResults.put(block, perBlockIdList); + } + perBlockIdList.add(result); + } + return perBlockIdList; + } + + /** + * Add future task to the tracking list to check the completion status of the + * block movement. + * + * @param blockID + * block identifier + * @param futureTask + * future task used for moving the respective block + */ + public void addBlock(Block block, + Future<BlockMovementAttemptFinished> futureTask) { + synchronized (moverTaskFutures) { + List<Future<BlockMovementAttemptFinished>> futures = + moverTaskFutures.get(block); + // null for the first task + if (futures == null) { + futures = new ArrayList<>(); + moverTaskFutures.put(block, futures); + } + futures.add(futureTask); + // Notify waiting tracker thread about the newly added tasks. + moverTaskFutures.notify(); + } + } + + /** + * Clear the pending movement and movement result queues. + */ + public void removeAll() { + synchronized (moverTaskFutures) { + moverTaskFutures.clear(); + } + synchronized (movementResults) { + movementResults.clear(); + } + } + + /** + * Sets running flag to false and clear the pending movement result queues. + */ + public void stopTracking() { + running = false; + removeAll(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java new file mode 100644 index 0000000..f9f3954 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common.sps; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * Blocks movements status handler, which is used to collect details of the + * completed block movements and later these attempted finished(with success or + * failure) blocks can be accessed to notify respective listeners, if any. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlocksMovementsStatusHandler { + private final List<Block> blockIdVsMovementStatus = new ArrayList<>(); + + /** + * Collect all the storage movement attempt finished blocks. Later this will + * be send to namenode via heart beat. + * + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks + */ + public void handle( + List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) { + List<Block> blocks = new ArrayList<>(); + + for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { + blocks.add(item.getBlock()); + } + // Adding to the tracking report list. Later this can be accessed to know + // the attempted block movements. + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.addAll(blocks); + } + } + + /** + * @return unmodifiable list of storage movement attempt finished blocks. + */ + public List<Block> getMoveAttemptFinishedBlocks() { + List<Block> moveAttemptFinishedBlks = new ArrayList<>(); + // 1. Adding all the completed block ids. + synchronized (blockIdVsMovementStatus) { + if (blockIdVsMovementStatus.size() > 0) { + moveAttemptFinishedBlks = Collections + .unmodifiableList(blockIdVsMovementStatus); + } + } + return moveAttemptFinishedBlks; + } + + /** + * Remove the storage movement attempt finished blocks from the tracking list. + * + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks + */ + public void remove(List<Block> moveAttemptFinishedBlks) { + if (moveAttemptFinishedBlks != null) { + blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks); + } + } + + /** + * Clear the blockID vs movement status tracking map. + */ + public void removeAll() { + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java new file mode 100644 index 0000000..fcffbe9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides commonly used classes for the block movement. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.hdfs.server.common.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java deleted file mode 100644 index b3b9fd9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is used to track the completion of block movement future tasks. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class BlockStorageMovementTracker implements Runnable { - private static final Logger LOG = LoggerFactory - .getLogger(BlockStorageMovementTracker.class); - private final CompletionService<BlockMovementAttemptFinished> moverCompletionService; - private final BlocksMovementsStatusHandler blksMovementsStatusHandler; - - // Keeps the information - block vs its list of future move tasks - private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures; - private final Map<Block, List<BlockMovementAttemptFinished>> movementResults; - - private volatile boolean running = true; - - /** - * BlockStorageMovementTracker constructor. - * - * @param moverCompletionService - * completion service. - * @param handler - * blocks movements status handler - */ - public BlockStorageMovementTracker( - CompletionService<BlockMovementAttemptFinished> moverCompletionService, - BlocksMovementsStatusHandler handler) { - this.moverCompletionService = moverCompletionService; - this.moverTaskFutures = new HashMap<>(); - this.blksMovementsStatusHandler = handler; - this.movementResults = new HashMap<>(); - } - - @Override - public void run() { - while (running) { - if (moverTaskFutures.size() <= 0) { - try { - synchronized (moverTaskFutures) { - // Waiting for mover tasks. - moverTaskFutures.wait(2000); - } - } catch (InterruptedException ignore) { - // Sets interrupt flag of this thread. - Thread.currentThread().interrupt(); - } - } - try { - Future<BlockMovementAttemptFinished> future = - moverCompletionService.take(); - if (future != null) { - BlockMovementAttemptFinished result = future.get(); - LOG.debug("Completed block movement. {}", result); - Block block = result.getBlock(); - List<Future<BlockMovementAttemptFinished>> blocksMoving = - moverTaskFutures.get(block); - if (blocksMoving == null) { - LOG.warn("Future task doesn't exist for block : {} ", block); - continue; - } - blocksMoving.remove(future); - - List<BlockMovementAttemptFinished> resultPerTrackIdList = - addMovementResultToBlockIdList(result); - - // Completed all the scheduled blocks movement under this 'trackId'. - if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) { - synchronized (moverTaskFutures) { - moverTaskFutures.remove(block); - } - if (running) { - // handle completed or inprogress blocks movements per trackId. - blksMovementsStatusHandler.handle(resultPerTrackIdList); - } - movementResults.remove(block); - } - } - } catch (InterruptedException e) { - if (running) { - LOG.error("Exception while moving block replica to target storage" - + " type", e); - } - } catch (ExecutionException e) { - // TODO: Do we need failure retries and implement the same if required. - LOG.error("Exception while moving block replica to target storage type", - e); - } - } - } - - private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList( - BlockMovementAttemptFinished result) { - Block block = result.getBlock(); - List<BlockMovementAttemptFinished> perBlockIdList; - synchronized (movementResults) { - perBlockIdList = movementResults.get(block); - if (perBlockIdList == null) { - perBlockIdList = new ArrayList<>(); - movementResults.put(block, perBlockIdList); - } - perBlockIdList.add(result); - } - return perBlockIdList; - } - - /** - * Add future task to the tracking list to check the completion status of the - * block movement. - * - * @param blockID - * block identifier - * @param futureTask - * future task used for moving the respective block - */ - void addBlock(Block block, - Future<BlockMovementAttemptFinished> futureTask) { - synchronized (moverTaskFutures) { - List<Future<BlockMovementAttemptFinished>> futures = - moverTaskFutures.get(block); - // null for the first task - if (futures == null) { - futures = new ArrayList<>(); - moverTaskFutures.put(block, futures); - } - futures.add(futureTask); - // Notify waiting tracker thread about the newly added tasks. - moverTaskFutures.notify(); - } - } - - /** - * Clear the pending movement and movement result queues. - */ - void removeAll() { - synchronized (moverTaskFutures) { - moverTaskFutures.clear(); - } - synchronized (movementResults) { - movementResults.clear(); - } - } - - /** - * Sets running flag to false and clear the pending movement result queues. - */ - public void stopTracking() { - running = false; - removeAll(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 9a9c7e0..42f2e93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -17,21 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; @@ -47,20 +35,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; +import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; +import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; +import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -81,7 +64,6 @@ public class StoragePolicySatisfyWorker { .getLogger(StoragePolicySatisfyWorker.class); private final DataNode datanode; - private final int ioFileBufferSize; private final int moverThreads; private final ExecutorService moveExecutor; @@ -89,10 +71,10 @@ public class StoragePolicySatisfyWorker { private final BlocksMovementsStatusHandler handler; private final BlockStorageMovementTracker movementTracker; private Daemon movementTrackerThread; + private final BlockDispatcher blkDispatcher; public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; - this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); @@ -103,7 +85,10 @@ public class StoragePolicySatisfyWorker { handler); movementTrackerThread = new Daemon(movementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); - + DNConf dnConf = datanode.getDnConf(); + int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); + blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(), + ioFileBufferSize, dnConf.getConnectToDnViaHostname()); // TODO: Needs to manage the number of concurrent moves per DataNode. } @@ -183,8 +168,7 @@ public class StoragePolicySatisfyWorker { assert sourceStorageType != targetStorageType : "Source and Target storage type shouldn't be same!"; BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID, - blkMovingInfo.getBlock(), blkMovingInfo.getSource(), - blkMovingInfo.getTarget(), sourceStorageType, targetStorageType); + blkMovingInfo); Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService .submit(blockMovingTask); movementTracker.addBlock(blkMovingInfo.getBlock(), @@ -199,244 +183,45 @@ public class StoragePolicySatisfyWorker { private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> { private final String blockPoolID; - private final Block block; - private final DatanodeInfo source; - private final DatanodeInfo target; - private final StorageType srcStorageType; - private final StorageType targetStorageType; + private final BlockMovingInfo blkMovingInfo; - BlockMovingTask(String blockPoolID, Block block, - DatanodeInfo source, DatanodeInfo target, - StorageType srcStorageType, StorageType targetStorageType) { + BlockMovingTask(String blockPoolID, BlockMovingInfo blkMovInfo) { this.blockPoolID = blockPoolID; - this.block = block; - this.source = source; - this.target = target; - this.srcStorageType = srcStorageType; - this.targetStorageType = targetStorageType; + this.blkMovingInfo = blkMovInfo; } @Override public BlockMovementAttemptFinished call() { BlockMovementStatus status = moveBlock(); - return new BlockMovementAttemptFinished(block, source, target, status); + return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(), + blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status); } private BlockMovementStatus moveBlock() { - LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " - + "storageType, sourceStoragetype:{} and destinStoragetype:{}", - block, source, target, srcStorageType, targetStorageType); - Socket sock = null; - DataOutputStream out = null; - DataInputStream in = null; + datanode.incrementXmitsInProgress(); + ExtendedBlock eb = new ExtendedBlock(blockPoolID, + blkMovingInfo.getBlock()); try { - datanode.incrementXmitsInProgress(); - - ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block); - DNConf dnConf = datanode.getDnConf(); - - String dnAddr = datanode.getDatanodeId() - .getXferAddr(dnConf.getConnectToDnViaHostname()); - sock = datanode.newSocket(); - NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), - dnConf.getSocketTimeout()); - sock.setSoTimeout(2 * dnConf.getSocketTimeout()); - LOG.debug("Connecting to datanode {}", dnAddr); - - OutputStream unbufOut = sock.getOutputStream(); - InputStream unbufIn = sock.getInputStream(); Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken( - extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - new StorageType[]{targetStorageType}, new String[0]); - + eb, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), + new StorageType[]{blkMovingInfo.getTargetStorageType()}, + new String[0]); DataEncryptionKeyFactory keyFactory = datanode - .getDataEncryptionKeyFactoryForBlock(extendedBlock); - IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock, - unbufOut, unbufIn, keyFactory, accessToken, target); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream( - new BufferedOutputStream(unbufOut, ioFileBufferSize)); - in = new DataInputStream( - new BufferedInputStream(unbufIn, ioFileBufferSize)); - sendRequest(out, extendedBlock, accessToken, source, targetStorageType); - receiveResponse(in); + .getDataEncryptionKeyFactoryForBlock(eb); - LOG.info( - "Successfully moved block:{} from src:{} to destin:{} for" - + " satisfying storageType:{}", - block, source, target, targetStorageType); - return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; - } catch (BlockPinningException e) { - // Pinned block won't be able to move to a different node. So, its not - // required to do retries, just marked as SUCCESS. - LOG.debug("Pinned block can't be moved, so skipping block:{}", block, - e); - return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; + return blkDispatcher.moveBlock(blkMovingInfo, + datanode.getSaslClient(), eb, datanode.newSocket(), + keyFactory, accessToken); } catch (IOException e) { // TODO: handle failure retries LOG.warn( "Failed to move block:{} from src:{} to destin:{} to satisfy " + "storageType:{}", - block, source, target, targetStorageType, e); + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } finally { datanode.decrementXmitsInProgress(); - IOUtils.closeStream(out); - IOUtils.closeStream(in); - IOUtils.closeSocket(sock); - } - } - - /** Send a reportedBlock replace request to the output stream. */ - private void sendRequest(DataOutputStream out, ExtendedBlock eb, - Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn, - StorageType destinStorageType) throws IOException { - new Sender(out).replaceBlock(eb, destinStorageType, accessToken, - srcDn.getDatanodeUuid(), srcDn, null); - } - - /** Receive a reportedBlock copy response from the input stream. */ - private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto - .parseFrom(vintPrefixed(in)); - while (response.getStatus() == Status.IN_PROGRESS) { - // read intermediate responses - response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); - } - String logInfo = "reportedBlock move is failed"; - DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true); - } - } - - /** - * Block movement status code. - */ - public enum BlockMovementStatus { - /** Success. */ - DN_BLK_STORAGE_MOVEMENT_SUCCESS(0), - /** - * Failure due to generation time stamp mismatches or network errors - * or no available space. - */ - DN_BLK_STORAGE_MOVEMENT_FAILURE(-1); - - // TODO: need to support different type of failures. Failure due to network - // errors, block pinned, no space available etc. - - private final int code; - - BlockMovementStatus(int code) { - this.code = code; - } - - /** - * @return the status code. - */ - int getStatusCode() { - return code; - } - } - - /** - * This class represents status from a block movement task. This will have the - * information of the task which was successful or failed due to errors. - */ - static class BlockMovementAttemptFinished { - private final Block block; - private final DatanodeInfo src; - private final DatanodeInfo target; - private final BlockMovementStatus status; - - BlockMovementAttemptFinished(Block block, DatanodeInfo src, - DatanodeInfo target, BlockMovementStatus status) { - this.block = block; - this.src = src; - this.target = target; - this.status = status; - } - - Block getBlock() { - return block; - } - - BlockMovementStatus getStatus() { - return status; - } - - @Override - public String toString() { - return new StringBuilder().append("Block movement attempt finished(\n ") - .append(" block : ") - .append(block).append(" src node: ").append(src) - .append(" target node: ").append(target) - .append(" movement status: ").append(status).append(")").toString(); - } - } - - /** - * Blocks movements status handler, which is used to collect details of the - * completed block movements and it will send these attempted finished(with - * success or failure) blocks to the namenode via heartbeat. - */ - public static class BlocksMovementsStatusHandler { - private final List<Block> blockIdVsMovementStatus = - new ArrayList<>(); - - /** - * Collect all the storage movement attempt finished blocks. Later this will - * be send to namenode via heart beat. - * - * @param moveAttemptFinishedBlks - * set of storage movement attempt finished blocks - */ - void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) { - List<Block> blocks = new ArrayList<>(); - - for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { - blocks.add(item.getBlock()); - } - // Adding to the tracking report list. Later this will be send to - // namenode via datanode heartbeat. - synchronized (blockIdVsMovementStatus) { - blockIdVsMovementStatus.addAll(blocks); - } - } - - /** - * @return unmodifiable list of storage movement attempt finished blocks. - */ - List<Block> getMoveAttemptFinishedBlocks() { - List<Block> moveAttemptFinishedBlks = new ArrayList<>(); - // 1. Adding all the completed block ids. - synchronized (blockIdVsMovementStatus) { - if (blockIdVsMovementStatus.size() > 0) { - moveAttemptFinishedBlks = Collections - .unmodifiableList(blockIdVsMovementStatus); - } - } - return moveAttemptFinishedBlks; - } - - /** - * Remove the storage movement attempt finished blocks from the tracking - * list. - * - * @param moveAttemptFinishedBlks - * set of storage movement attempt finished blocks - */ - void remove(List<Block> moveAttemptFinishedBlks) { - if (moveAttemptFinishedBlks != null) { - blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks); - } - } - - /** - * Clear the blockID vs movement status tracking map. - */ - void removeAll() { - synchronized (blockIdVsMovementStatus) { - blockIdVsMovementStatus.clear(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ed1c823..37322e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1299,7 +1299,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.getSPSService()), new IntraSPSNameNodeFileIdCollector(getFSDirectory(), blockManager.getSPSService()), - new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this)); + new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null); blockManager.startSPS(); } finally { startingActiveService = false; @@ -3996,7 +3996,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, + " movement attempt finished block info sent by DN"); } } else { - sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished); + sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java index e6f78e1..1b11d01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java @@ -38,7 +38,6 @@ public interface BlockMoveTaskHandler { * contain the required info to move the block, that source location, * destination location and storage types. */ - void submitMoveTask(BlockMovingInfo blkMovingInfo, - BlockMovementListener blockMoveCompletionListener) throws IOException; + void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index 3f0155d..ea7a093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -46,8 +46,7 @@ import com.google.common.annotations.VisibleForTesting; * finished for a longer time period, then such items will retries automatically * after timeout. The default timeout would be 5 minutes. */ -public class BlockStorageMovementAttemptedItems - implements BlockMovementListener { +public class BlockStorageMovementAttemptedItems{ private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); @@ -59,6 +58,7 @@ public class BlockStorageMovementAttemptedItems private final List<Block> movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; + private final BlockMovementListener blkMovementListener; // // It might take anywhere between 5 to 10 minutes before // a request is timed out. @@ -74,7 +74,8 @@ public class BlockStorageMovementAttemptedItems private final SPSService service; public BlockStorageMovementAttemptedItems(SPSService service, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, + BlockMovementListener blockMovementListener) { this.service = service; long recheckTimeout = this.service.getConf().getLong( DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, @@ -89,6 +90,7 @@ public class BlockStorageMovementAttemptedItems this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; storageMovementAttemptedItems = new ArrayList<>(); movementFinishedBlocks = new ArrayList<>(); + this.blkMovementListener = blockMovementListener; } /** @@ -118,6 +120,10 @@ public class BlockStorageMovementAttemptedItems synchronized (movementFinishedBlocks) { movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks)); } + // External listener if it is plugged-in + if (blkMovementListener != null) { + blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java index b27e8c9..d6e92d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java @@ -44,8 +44,7 @@ public class IntraSPSNameNodeBlockMoveTaskHandler } @Override - public void submitMoveTask(BlockMovingInfo blkMovingInfo, - BlockMovementListener blockMoveCompletionListener) throws IOException { + public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { namesystem.readLock(); try { DatanodeDescriptor dn = blockManager.getDatanodeManager() http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index d74e391..ecc6ceb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; /** * An interface for SPSService, which exposes life cycle and processing APIs. @@ -41,9 +42,11 @@ public interface SPSService { * id * @param handler * - a helper service for moving the blocks + * @param blkMovementListener + * - listener to know about block movement attempt completion */ void init(Context ctxt, FileIdCollector fileIDCollector, - BlockMoveTaskHandler handler); + BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener); /** * Starts the SPS service. Make sure to initialize the helper services before @@ -112,4 +115,13 @@ public interface SPSService { * - directory inode id. */ void markScanCompletedForPath(Long inodeId); + + /** + * Notify the details of storage movement attempt finished blocks. + * + * @param moveAttemptFinishedBlks + * - array contains all the blocks that are attempted to move + */ + void notifyStorageMovementAttemptFinishedBlks( + BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index aafdc65..9ba8af7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -132,13 +132,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } public void init(final Context context, final FileIdCollector fileIDCollector, - final BlockMoveTaskHandler blockMovementTaskHandler) { + final BlockMoveTaskHandler blockMovementTaskHandler, + final BlockMovementListener blockMovementListener) { this.ctxt = context; this.storageMovementNeeded = new BlockStorageMovementNeeded(context, fileIDCollector); this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(this, - storageMovementNeeded); + storageMovementNeeded, blockMovementListener); this.blockMoveTaskHandler = blockMovementTaskHandler; this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( @@ -291,6 +292,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + " back to retry queue as some of the blocks" + " are low redundant."); } + itemInfo.increRetryCount(); this.storageMovementNeeded.add(itemInfo); break; case BLOCKS_FAILED_TO_MOVE: @@ -410,15 +412,18 @@ public class StoragePolicySatisfier implements SPSService, Runnable { liveDns, ecPolicy); if (blocksPaired) { status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED; - } else { - // none of the blocks found its eligible targets for satisfying the - // storage policy. + } else + if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) { + // Check if the previous block was successfully paired. Here the + // status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the + // blocks of a file found its eligible targets to satisfy the storage + // policy. status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; } - } else { - if (hasLowRedundancyBlocks) { - status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; - } + } else if (hasLowRedundancyBlocks + && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) { + // Check if the previous block was successfully paired. + status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; } } @@ -426,8 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen try { - blockMoveTaskHandler.submitMoveTask(blkMovingInfo, - storageMovementsMonitor); + blockMoveTaskHandler.submitMoveTask(blkMovingInfo); LOG.debug("BlockMovingInfo: {}", blkMovingInfo); assignedBlockIds.add(blkMovingInfo.getBlock()); blockCount++; @@ -823,7 +827,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * @param moveAttemptFinishedBlks * set of storage movement attempt finished blocks. */ - public void handleStorageMovementAttemptFinishedBlks( + public void notifyStorageMovementAttemptFinishedBlks( BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) { if (moveAttemptFinishedBlks.getBlocks().length <= 0) { return; @@ -833,7 +837,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - BlockMovementListener getAttemptedItemsMonitor() { + BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { return storageMovementsMonitor; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java new file mode 100644 index 0000000..a1c8eec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.sps; + +import java.io.IOException; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.balancer.KeyManager; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; +import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; +import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; +import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class handles the external SPS block movements. This will move the + * given block to a target datanode by directly establishing socket connection + * to it and invokes function + * {@link Sender#replaceBlock(ExtendedBlock, StorageType, Token, String, + * DatanodeInfo, String)}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { + private static final Logger LOG = LoggerFactory + .getLogger(ExternalSPSBlockMoveTaskHandler.class); + + private final ExecutorService moveExecutor; + private final CompletionService<BlockMovementAttemptFinished> mCompletionServ; + private final NameNodeConnector nnc; + private final SaslDataTransferClient saslClient; + private final BlockStorageMovementTracker blkMovementTracker; + private Daemon movementTrackerThread; + private final SPSService service; + private final BlockDispatcher blkDispatcher; + + public ExternalSPSBlockMoveTaskHandler(Configuration conf, + NameNodeConnector nnc, SPSService spsService) { + int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + moveExecutor = initializeBlockMoverThreadPool(moverThreads); + mCompletionServ = new ExecutorCompletionService<>(moveExecutor); + this.nnc = nnc; + this.saslClient = new SaslDataTransferClient(conf, + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + nnc.getFallbackToSimpleAuth()); + this.blkMovementTracker = new BlockStorageMovementTracker( + mCompletionServ, new ExternalBlocksMovementsStatusHandler()); + this.service = spsService; + + boolean connectToDnViaHostname = conf.getBoolean( + HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, + HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); + blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT, + ioFileBufferSize, connectToDnViaHostname); + } + + /** + * Initializes block movement tracker daemon and starts the thread. + */ + void init() { + movementTrackerThread = new Daemon(this.blkMovementTracker); + movementTrackerThread.setName("BlockStorageMovementTracker"); + movementTrackerThread.start(); + } + + private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) { + LOG.debug("Block mover to satisfy storage policy; pool threads={}", num); + + ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("BlockMoverTask-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution for block movement to satisfy storage policy" + + " got rejected, Executing in current thread"); + // will run in the current thread. + super.rejectedExecution(runnable, e); + } + }); + + moverThreadPool.allowCoreThreadTimeOut(true); + return moverThreadPool; + } + + @Override + public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { + // TODO: Need to increment scheduled block size on the target node. This + // count will be used to calculate the remaining space of target datanode + // during block movement assignment logic. In the internal movement, + // remaining space is bookkeeping at the DatanodeDescriptor, please refer + // IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and + // updating via the funcation call - + // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); + LOG.debug("Received BlockMovingTask {}", blkMovingInfo); + BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); + Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ + .submit(blockMovingTask); + blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable); + } + + private class ExternalBlocksMovementsStatusHandler + extends BlocksMovementsStatusHandler { + @Override + public void handle( + List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) { + List<Block> blocks = new ArrayList<>(); + for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { + blocks.add(item.getBlock()); + } + BlocksStorageMoveAttemptFinished blkAttempted = + new BlocksStorageMoveAttemptFinished( + blocks.toArray(new Block[blocks.size()])); + service.notifyStorageMovementAttemptFinishedBlks(blkAttempted); + } + } + + /** + * This class encapsulates the process of moving the block replica to the + * given target. + */ + private class BlockMovingTask + implements Callable<BlockMovementAttemptFinished> { + private final BlockMovingInfo blkMovingInfo; + + BlockMovingTask(BlockMovingInfo blkMovingInfo) { + this.blkMovingInfo = blkMovingInfo; + } + + @Override + public BlockMovementAttemptFinished call() { + BlockMovementStatus blkMovementStatus = moveBlock(); + return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(), + blkMovingInfo.getSource(), blkMovingInfo.getTarget(), + blkMovementStatus); + } + + private BlockMovementStatus moveBlock() { + ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), + blkMovingInfo.getBlock()); + + final KeyManager km = nnc.getKeyManager(); + Token<BlockTokenIdentifier> accessToken; + try { + accessToken = km.getAccessToken(eb, + new StorageType[]{blkMovingInfo.getTargetStorageType()}, + new String[0]); + } catch (IOException e) { + // TODO: handle failure retries + LOG.warn( + "Failed to move block:{} from src:{} to destin:{} to satisfy " + + "storageType:{}", + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; + } + return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb, + new Socket(), km, accessToken); + } + } + + /** + * Cleanup the resources. + */ + void cleanUp() { + blkMovementTracker.stopTracking(); + if (movementTrackerThread != null) { + movementTrackerThread.interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index 3e2c324..4097339 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -56,7 +56,7 @@ public class TestBlockStorageMovementAttemptedItems { unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt, null); bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, - unsatisfiedStorageMovementFiles); + unsatisfiedStorageMovementFiles, null); } @After http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index e0bf410..8115661 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -92,7 +92,7 @@ public class TestStoragePolicySatisfier { private static final String ONE_SSD = "ONE_SSD"; private static final String COLD = "COLD"; - private static final Logger LOG = + protected static final Logger LOG = LoggerFactory.getLogger(TestStoragePolicySatisfier.class); private Configuration config = null; private StorageType[][] allDiskTypes = @@ -1337,7 +1337,7 @@ public class TestStoragePolicySatisfier { }; FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); - sps.init(ctxt, fileIDCollector, null); + sps.init(ctxt, fileIDCollector, null, null); sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); @@ -1404,7 +1404,7 @@ public class TestStoragePolicySatisfier { } }; FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); - sps.init(ctxt, fileIDCollector, null); + sps.init(ctxt, fileIDCollector, null, null); sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0cb8d9b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 3ced34e..9a401bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -18,20 +18,33 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.Context; import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; +import org.junit.Assert; import org.junit.Ignore; +import com.google.common.collect.Maps; + /** * Tests the external sps service plugins. */ @@ -69,23 +82,24 @@ public class TestExternalStoragePolicySatisfier cluster.waitActive(); if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false)) { - SPSService spsService = cluster.getNameNode().getNamesystem() - .getBlockManager().getSPSService(); + BlockManager blkMgr = cluster.getNameNode().getNamesystem() + .getBlockManager(); + SPSService spsService = blkMgr.getSPSService(); spsService.stopGracefully(); IntraSPSNameNodeContext context = new IntraSPSNameNodeContext( cluster.getNameNode().getNamesystem(), - cluster.getNameNode().getNamesystem().getBlockManager(), cluster - .getNameNode().getNamesystem().getBlockManager().getSPSService()); - + blkMgr, blkMgr.getSPSService()); + ExternalBlockMovementListener blkMoveListener = + new ExternalBlockMovementListener(); + ExternalSPSBlockMoveTaskHandler externalHandler = + new ExternalSPSBlockMoveTaskHandler(conf, getNameNodeConnector(conf), + blkMgr.getSPSService()); + externalHandler.init(); spsService.init(context, - new ExternalSPSFileIDCollector(context, - cluster.getNameNode().getNamesystem().getBlockManager() - .getSPSService(), - 5), - new IntraSPSNameNodeBlockMoveTaskHandler( - cluster.getNameNode().getNamesystem().getBlockManager(), - cluster.getNameNode().getNamesystem())); + new ExternalSPSFileIDCollector(context, blkMgr.getSPSService(), 5), + externalHandler, + blkMoveListener); spsService.start(true); } return cluster; @@ -97,6 +111,35 @@ public class TestExternalStoragePolicySatisfier return new ExternalSPSFileIDCollector(ctxt, sps, 5); } + private class ExternalBlockMovementListener implements BlockMovementListener { + + private List<Block> actualBlockMovements = new ArrayList<>(); + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + for (Block block : moveAttemptFinishedBlks) { + actualBlockMovements.add(block); + } + LOG.info("Movement attempted blocks", actualBlockMovements); + } + } + + private NameNodeConnector getNameNodeConnector(Configuration conf) + throws IOException { + final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf); + Assert.assertEquals(1, namenodes.size()); + Map<URI, List<Path>> nnMap = Maps.newHashMap(); + for (URI nn : namenodes) { + nnMap.put(nn, null); + } + final Path externalSPSPathId = new Path("/system/externalSPS.id"); + final List<NameNodeConnector> nncs = NameNodeConnector + .newNameNodeConnectors(nnMap, + StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId, + conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); + return nncs.get(0); + } + /** * This test need not run as external scan is not a batch based scanning right * now. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org