This is an automated email from the ASF dual-hosted git repository. ferhui pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 5412fbf6d4b HDFS-16460. [SPS]: Handle failure retries for moving tasks (#4001) 5412fbf6d4b is described below commit 5412fbf6d4b59836df6ad9118f230812c8a50f1c Author: litao <tomlees...@gmail.com> AuthorDate: Fri Apr 8 12:26:39 2022 +0800 HDFS-16460. [SPS]: Handle failure retries for moving tasks (#4001) --- .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++ .../hdfs/server/common/sps/BlockDispatcher.java | 10 +---- .../sps/ExternalSPSBlockMoveTaskHandler.java | 38 +++++++++++------- .../hdfs/server/sps/ExternalSPSFaultInjector.java | 46 ++++++++++++++++++++++ .../src/main/resources/hdfs-default.xml | 8 ++++ .../sps/TestExternalStoragePolicySatisfier.java | 23 +++++++++++ 6 files changed, 105 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6216f6e7a1d..cf1755cd9f9 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -827,6 +827,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.storage.policy.satisfier.retry.max.attempts"; public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT = 3; + public static final String DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY = + "dfs.storage.policy.satisfier.move.task.retry.max.attempts"; + public static final int DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT = + 3; public static final String DFS_STORAGE_DEFAULT_POLICY = "dfs.storage.default.policy"; public static final HdfsConstants.StoragePolicy diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java index f87fcaef054..f7756c74851 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java @@ -101,7 +101,7 @@ public class BlockDispatcher { */ public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo, SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock, - DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) { + DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) throws IOException { LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " + "storageType, sourceStoragetype:{} and destinStoragetype:{}", blkMovingInfo.getBlock(), blkMovingInfo.getSource(), @@ -149,14 +149,6 @@ public class BlockDispatcher { LOG.debug("Pinned block can't be moved, so skipping block:{}", blkMovingInfo.getBlock(), e); return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS; - } catch (IOException e) { - // TODO: handle failure retries - LOG.warn( - "Failed to move block:{} from src:{} to destin:{} to satisfy " - + "storageType:{}", - blkMovingInfo.getBlock(), blkMovingInfo.getSource(), - blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); - return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 64dec8bbc5c..ec3837424cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -80,11 +80,15 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { private Daemon movementTrackerThread; private final SPSService service; private final BlockDispatcher blkDispatcher; + private final int maxRetry; public ExternalSPSBlockMoveTaskHandler(Configuration conf, NameNodeConnector nnc, SPSService spsService) { int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + maxRetry = conf.getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); mCompletionServ = new ExecutorCompletionService<>(moveExecutor); this.nnc = nnc; @@ -151,7 +155,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { // during block movement assignment logic. In the internal movement, // remaining space is bookkeeping at the DatanodeDescriptor, please refer // IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and - // updating via the funcation call - + // updating via the function call - // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType()); LOG.debug("Received BlockMovingTask {}", blkMovingInfo); BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo); @@ -195,21 +199,25 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { final KeyManager km = nnc.getKeyManager(); Token<BlockTokenIdentifier> accessToken; - try { - accessToken = km.getAccessToken(eb, - new StorageType[]{blkMovingInfo.getTargetStorageType()}, - new String[0]); - } catch (IOException e) { - // TODO: handle failure retries - LOG.warn( - "Failed to move block:{} from src:{} to destin:{} to satisfy " - + "storageType:{}", - blkMovingInfo.getBlock(), blkMovingInfo.getSource(), - blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e); - return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; + int retry = 0; + while (retry <= maxRetry) { + try { + ExternalSPSFaultInjector.getInstance().mockAnException(retry); + accessToken = km.getAccessToken(eb, + new StorageType[]{blkMovingInfo.getTargetStorageType()}, + new String[0]); + return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb, + new Socket(), km, accessToken); + } catch (IOException e) { + LOG.warn( + "Failed to move block:{} from src:{} to dest:{} to satisfy " + + "storageType:{}, retry: {}", + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), retry, e); + retry++; + } } - return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb, - new Socket(), km, accessToken); + return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java new file mode 100644 index 00000000000..5ddf1ee3c0f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFaultInjector.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.sps; + +import org.apache.hadoop.classification.VisibleForTesting; + +import java.io.IOException; + +/** + * Used to inject certain faults for testing. + */ +public class ExternalSPSFaultInjector { + @VisibleForTesting + private static ExternalSPSFaultInjector instance = + new ExternalSPSFaultInjector(); + + @VisibleForTesting + public static ExternalSPSFaultInjector getInstance() { + return instance; + } + + @VisibleForTesting + public static void setInstance(ExternalSPSFaultInjector instance) { + ExternalSPSFaultInjector.instance = instance; + } + + @VisibleForTesting + public void mockAnException(int retry) throws IOException { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d45f8eb5b7e..a75a76d7af4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5507,6 +5507,14 @@ </description> </property> +<property> + <name>dfs.storage.policy.satisfier.move.task.retry.max.attempts</name> + <value>3</value> + <description> + Max retries for moving task to satisfy the block storage policy. + </description> +</property> + <property> <name>dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms</name> <value>300000</value> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 7ef83c392e2..9b4dc437ce4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -33,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; @@ -130,6 +131,14 @@ public class TestExternalStoragePolicySatisfier { private static final int DEFAULT_BLOCK_SIZE = 1024; private static final Logger LOG = LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class); + private final ExternalSPSFaultInjector injector = new ExternalSPSFaultInjector() { + @Override + public void mockAnException(int retry) throws IOException { + if (retry < DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT) { + throw new IOException("IO exception"); + } + } + }; @Before public void setUp() { @@ -480,6 +489,20 @@ public class TestExternalStoragePolicySatisfier { } } + @Test(timeout = 300000) + public void testWhenStoragePolicySetToCOLDWithException() + throws Exception { + + try { + createCluster(); + // Mock an IOException 3 times, and moving tasks should succeed finally. + ExternalSPSFaultInjector.setInstance(injector); + doTestWhenStoragePolicySetToCOLD(); + } finally { + shutdownCluster(); + } + } + private void doTestWhenStoragePolicySetToCOLD() throws Exception { // Change policy to COLD dfs.setStoragePolicy(new Path(FILE), COLD); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org