This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit e1ff3b2f97804c5f0c4cbd7d3598a18866b50604 Author: Masatake Iwasaki <[email protected]> AuthorDate: Sat Jan 4 01:55:27 2020 +0900 HDFS-15068. DataNode could meet deadlock if invoke refreshVolumes when register. Contributed by Aiphago. Signed-off-by: Masatake Iwasaki <[email protected]> (cherry picked from commit 037ec8cfb1406ea3a8225a1b6306c2e78440353b) --- .../hdfs/server/datanode/BPOfferService.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 147 +++++++++++---------- .../server/datanode/DataNodeFaultInjector.java | 5 + .../server/datanode/TestDataNodeVolumeFailure.java | 46 +++++++ 4 files changed, 127 insertions(+), 73 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index a25f6a9..68e9eb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -419,7 +419,7 @@ class BPOfferService { reg.getStorageInfo().getClusterID(), "cluster ID"); } bpRegistration = reg; - + DataNodeFaultInjector.get().delayWhenOfferServiceHoldLock(); dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); // Add the initial block token secret keys to the DN's secret manager. if (dn.isBlockTokenEnabled) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index cfe3054..8cd8f98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -745,90 +745,93 @@ public class DataNode extends ReconfigurableBase * @throws IOException on error. If an IOException is thrown, some new volumes * may have been successfully added and removed. */ - private synchronized void refreshVolumes(String newVolumes) throws IOException { - Configuration conf = getConf(); - conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); - ExecutorService service = null; - int numOldDataDirs = dataDirs.size(); - ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes); - StringBuilder errorMessageBuilder = new StringBuilder(); - List<String> effectiveVolumes = Lists.newArrayList(); - for (StorageLocation sl : changedVolumes.unchangedLocations) { - effectiveVolumes.add(sl.toString()); + private void refreshVolumes(String newVolumes) throws IOException { + // Add volumes for each Namespace + final List<NamespaceInfo> nsInfos = Lists.newArrayList(); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + nsInfos.add(bpos.getNamespaceInfo()); } - - try { - if (numOldDataDirs + getFSDataset().getNumFailedVolumes() - + changedVolumes.newLocations.size() - - changedVolumes.deactivateLocations.size() <= 0) { - throw new IOException("Attempt to remove all volumes."); + synchronized(this) { + Configuration conf = getConf(); + conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); + ExecutorService service = null; + int numOldDataDirs = dataDirs.size(); + ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes); + StringBuilder errorMessageBuilder = new StringBuilder(); + List<String> effectiveVolumes = Lists.newArrayList(); + for (StorageLocation sl : changedVolumes.unchangedLocations) { + effectiveVolumes.add(sl.toString()); } - if (!changedVolumes.newLocations.isEmpty()) { - LOG.info("Adding new volumes: {}", - Joiner.on(",").join(changedVolumes.newLocations)); - - // Add volumes for each Namespace - final List<NamespaceInfo> nsInfos = Lists.newArrayList(); - for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { - nsInfos.add(bpos.getNamespaceInfo()); + + try { + if (numOldDataDirs + getFSDataset().getNumFailedVolumes() + + changedVolumes.newLocations.size() + - changedVolumes.deactivateLocations.size() <= 0) { + throw new IOException("Attempt to remove all volumes."); } - service = Executors - .newFixedThreadPool(changedVolumes.newLocations.size()); - List<Future<IOException>> exceptions = Lists.newArrayList(); - for (final StorageLocation location : changedVolumes.newLocations) { - exceptions.add(service.submit(new Callable<IOException>() { - @Override - public IOException call() { - try { - data.addVolume(location, nsInfos); - } catch (IOException e) { - return e; + if (!changedVolumes.newLocations.isEmpty()) { + LOG.info("Adding new volumes: {}", + Joiner.on(",").join(changedVolumes.newLocations)); + + service = Executors + .newFixedThreadPool(changedVolumes.newLocations.size()); + List<Future<IOException>> exceptions = Lists.newArrayList(); + + for (final StorageLocation location : changedVolumes.newLocations) { + exceptions.add(service.submit(new Callable<IOException>() { + @Override + public IOException call() { + try { + data.addVolume(location, nsInfos); + } catch (IOException e) { + return e; + } + return null; } - return null; - } - })); - } + })); + } - for (int i = 0; i < changedVolumes.newLocations.size(); i++) { - StorageLocation volume = changedVolumes.newLocations.get(i); - Future<IOException> ioExceptionFuture = exceptions.get(i); - try { - IOException ioe = ioExceptionFuture.get(); - if (ioe != null) { + for (int i = 0; i < changedVolumes.newLocations.size(); i++) { + StorageLocation volume = changedVolumes.newLocations.get(i); + Future<IOException> ioExceptionFuture = exceptions.get(i); + try { + IOException ioe = ioExceptionFuture.get(); + if (ioe != null) { + errorMessageBuilder.append( + String.format("FAILED TO ADD: %s: %s%n", + volume, ioe.getMessage())); + LOG.error("Failed to add volume: {}", volume, ioe); + } else { + effectiveVolumes.add(volume.toString()); + LOG.info("Successfully added volume: {}", volume); + } + } catch (Exception e) { errorMessageBuilder.append( - String.format("FAILED TO ADD: %s: %s%n", - volume, ioe.getMessage())); - LOG.error("Failed to add volume: {}", volume, ioe); - } else { - effectiveVolumes.add(volume.toString()); - LOG.info("Successfully added volume: {}", volume); + String.format("FAILED to ADD: %s: %s%n", volume, + e.toString())); + LOG.error("Failed to add volume: {}", volume, e); } - } catch (Exception e) { - errorMessageBuilder.append( - String.format("FAILED to ADD: %s: %s%n", volume, - e.toString())); - LOG.error("Failed to add volume: {}", volume, e); } } - } - try { - removeVolumes(changedVolumes.deactivateLocations); - } catch (IOException e) { - errorMessageBuilder.append(e.getMessage()); - LOG.error("Failed to remove volume", e); - } + try { + removeVolumes(changedVolumes.deactivateLocations); + } catch (IOException e) { + errorMessageBuilder.append(e.getMessage()); + LOG.error("Failed to remove volume", e); + } - if (errorMessageBuilder.length() > 0) { - throw new IOException(errorMessageBuilder.toString()); - } - } finally { - if (service != null) { - service.shutdown(); + if (errorMessageBuilder.length() > 0) { + throw new IOException(errorMessageBuilder.toString()); + } + } finally { + if (service != null) { + service.shutdown(); + } + conf.set(DFS_DATANODE_DATA_DIR_KEY, + Joiner.on(",").join(effectiveVolumes)); + dataDirs = getStorageLocations(conf); } - conf.set(DFS_DATANODE_DATA_DIR_KEY, - Joiner.on(",").join(effectiveVolumes)); - dataDirs = getStorageLocations(conf); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 1dd779e..7e66111 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -95,4 +95,9 @@ public class DataNodeFaultInjector { * process. */ public void stripedBlockReconstruction() throws IOException {} + + /** + * Used as a hook to inject intercept when BPOfferService hold lock. + */ + public void delayWhenOfferServiceHoldLock() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index d9ec2cd..4b4002b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; @@ -405,6 +407,50 @@ public class TestDataNodeVolumeFailure { } /** + * Test {@link DataNode#refreshVolumes(String)} not deadLock with + * {@link BPOfferService#registrationSucceeded(BPServiceActor, + * DatanodeRegistration)}. + */ + @Test(timeout=10000) + public void testRefreshDeadLock() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + public void delayWhenOfferServiceHoldLock() { + try { + latch.await(); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + DataNode dn = cluster.getDataNodes().get(0); + File volume = cluster.getInstanceStorageDir(0, 0); + String dataDirs = volume.getPath(); + List<BPOfferService> allBpOs = dn.getAllBpOs(); + BPOfferService service = allBpOs.get(0); + BPServiceActor actor = service.getBPServiceActors().get(0); + DatanodeRegistration bpRegistration = actor.getBpRegistration(); + + Thread register = new Thread(() -> { + try { + service.registrationSucceeded(actor, bpRegistration); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + register.start(); + String newdir = dataDirs + "tmp"; + // Make sure service have get writelock + latch.countDown(); + String result = dn.reconfigurePropertyImpl( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newdir); + assertNotNull(result); + } + + /** * Test changing the number of volumes does not impact the disk failure * tolerance. */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
