KeeProMise commented on code in PR #6926:
URL: https://github.com/apache/hadoop/pull/6926#discussion_r1749928649


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -4388,4 +4409,94 @@ boolean isSlownode() {
   public BlockPoolManager getBlockPoolManager() {
     return blockPoolManager;
   }
+
+  public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock 
targetBlk,
+      DatanodeInfo targetDn) throws IOException {
+    if (!data.isValidBlock(sourceBlk)) {
+      // block does not exist or is under-construction
+      String errStr =
+          "copyBlock:(" + this.getInfoPort() + ") Can't send invalid block " + 
sourceBlk + " "
+              + data.getReplicaString(sourceBlk.getBlockPoolId(), 
sourceBlk.getBlockId());
+      LOG.info(errStr);
+      throw new IOException(errStr);
+    }
+    long onDiskLength = data.getLength(sourceBlk);
+    if (sourceBlk.getNumBytes() > onDiskLength) {
+      // Shorter on-disk len indicates corruption so report NN the corrupt 
block
+      String msg = "copyBlock: Can't replicate block " + sourceBlk + " because 
on-disk length "
+          + onDiskLength + " is shorter than provided length " + 
sourceBlk.getNumBytes();
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
+    LOG.info(getDatanodeInfo() + " copyBlock: Starting thread to transfer: " + 
"block:"
+        + sourceBlk + " from " + this.getDatanodeUuid() + " to " + 
targetDn.getDatanodeUuid()
+        + "(" + targetDn + ")");
+    Future<?> result;
+    if (this.getDatanodeUuid().equals(targetDn.getDatanodeUuid())) {
+      result = copyBlockCrossNamespaceExecutor.submit(new 
LocalBlockCopy(sourceBlk, targetBlk));
+    } else {
+      result = copyBlockCrossNamespaceExecutor.submit(
+          new DataCopy(targetDn, sourceBlk, targetBlk).getDataTransfer());
+    }
+    try {
+      result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), 
TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new IOException(e);
+    }
+  }
+
+  private class DataCopy {
+    private final DataTransfer dataTransfer;
+
+    DataCopy(DatanodeInfo targetDn, ExtendedBlock sourceBlk, ExtendedBlock 
targetBlk) {
+      FsVolumeImpl volume = (FsVolumeImpl) data.getVolume(sourceBlk);
+      StorageType storageType = volume.getStorageType();
+      String storageId = volume.getStorageID();
+
+      DatanodeInfo[] targets = new DatanodeInfo[] {targetDn};
+      StorageType[] targetStorageTypes = new StorageType[] {storageType};
+      String[] targetStorageIds = new String[] {storageId};
+      dataTransfer =
+          new DataTransfer(targets, targetStorageTypes, targetStorageIds, 
sourceBlk, targetBlk,
+              PIPELINE_SETUP_CREATE, "");
+    }
+
+    public DataTransfer getDataTransfer() {
+      return dataTransfer;
+    }
+  }
+
+  class LocalBlockCopy implements Callable<Boolean> {
+    private ExtendedBlock sourceBlk = null;
+    private ExtendedBlock targetBlk = null;
+
+    LocalBlockCopy(ExtendedBlock sourceBlk, ExtendedBlock targetBlk) {
+      this.sourceBlk = sourceBlk;
+      this.targetBlk = targetBlk;
+    }
+
+    public Boolean call() throws IOException {
+      try {
+        targetBlk.setNumBytes(sourceBlk.getNumBytes());
+        data.hardLinkOneBlock(sourceBlk, targetBlk);
+        FsVolumeSpi v = (FsVolumeSpi) (getFSDataset().getVolume(targetBlk));
+        closeBlock(targetBlk, null, v.getStorageID(), v.isTransientStorage());
+
+        BlockLocalPathInfo srcBlpi = data.getBlockLocalPathInfo(sourceBlk);
+        BlockLocalPathInfo dstBlpi = data.getBlockLocalPathInfo(targetBlk);
+        LOG.info(
+            getClass().getSimpleName() + ": Hardlinked " + sourceBlk + "( " + 
srcBlpi.getBlockPath()
+                + " " + srcBlpi.getMetaPath() + " ) " + "to " + targetBlk + "( 
"
+                + dstBlpi.getBlockPath() + " " + dstBlpi.getMetaPath() + " ) 
");
+

Review Comment:
   As the above comment.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -4388,4 +4409,94 @@ boolean isSlownode() {
   public BlockPoolManager getBlockPoolManager() {
     return blockPoolManager;
   }
+
+  public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock 
targetBlk,
+      DatanodeInfo targetDn) throws IOException {
+    if (!data.isValidBlock(sourceBlk)) {
+      // block does not exist or is under-construction
+      String errStr =
+          "copyBlock:(" + this.getInfoPort() + ") Can't send invalid block " + 
sourceBlk + " "
+              + data.getReplicaString(sourceBlk.getBlockPoolId(), 
sourceBlk.getBlockId());
+      LOG.info(errStr);
+      throw new IOException(errStr);
+    }
+    long onDiskLength = data.getLength(sourceBlk);
+    if (sourceBlk.getNumBytes() > onDiskLength) {
+      // Shorter on-disk len indicates corruption so report NN the corrupt 
block
+      String msg = "copyBlock: Can't replicate block " + sourceBlk + " because 
on-disk length "
+          + onDiskLength + " is shorter than provided length " + 
sourceBlk.getNumBytes();
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
+    LOG.info(getDatanodeInfo() + " copyBlock: Starting thread to transfer: " + 
"block:"
+        + sourceBlk + " from " + this.getDatanodeUuid() + " to " + 
targetDn.getDatanodeUuid()
+        + "(" + targetDn + ")");
+    Future<?> result;
+    if (this.getDatanodeUuid().equals(targetDn.getDatanodeUuid())) {
+      result = copyBlockCrossNamespaceExecutor.submit(new 
LocalBlockCopy(sourceBlk, targetBlk));
+    } else {
+      result = copyBlockCrossNamespaceExecutor.submit(
+          new DataCopy(targetDn, sourceBlk, targetBlk).getDataTransfer());
+    }
+    try {
+      result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), 
TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new IOException(e);
+    }

Review Comment:
   Should distinguish between InterruptedException and ExecutionException, 
TimeoutException?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java:
##########
@@ -3863,5 +3864,27 @@ public void setLastDirScannerFinishTime(long time) {
   public long getPendingAsyncDeletions() {
     return asyncDiskService.countPendingDeletions();
   }
+
+  @Override
+  public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) 
throws IOException {
+    BlockLocalPathInfo blpi = getBlockLocalPathInfo(srcBlock);
+    FsVolumeImpl v = getVolume(srcBlock);
+
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, 
dstBlock.getBlockPoolId(),
+        v.getStorageID())) {
+      File src = new File(blpi.getBlockPath());
+      File srcMeta = new File(blpi.getMetaPath());
+      BlockPoolSlice dstBPS = v.getBlockPoolSlice(dstBlock.getBlockPoolId());
+      File dstBlockFile = dstBPS.hardLinkOneBlock(src, srcMeta, 
dstBlock.getLocalBlock());
+
+      ReplicaInfo replicaInfo =
+          new LocalReplicaInPipeline(dstBlock.getBlockId(), 
dstBlock.getGenerationStamp(), v,
+              dstBlockFile.getParentFile(), 
dstBlock.getLocalBlock().getNumBytes());
+      dstBlockFile = dstBPS.addFinalizedBlock(dstBlock.getLocalBlock(), 
replicaInfo);
+      replicaInfo = new FinalizedReplica(dstBlock.getLocalBlock(), 
getVolume(srcBlock),
+          dstBlockFile.getParentFile());
+      volumeMap.add(dstBlock.getBlockPoolId(), replicaInfo);

Review Comment:
   Since volumeMap.add() needs to acquire the block pool's read lock, can we 
move the line volumeMap.add()  outside the try block, and execute 
volumeMap.add() after releasing the lock? I'm not sure if there will be any 
concurrency issues with this approach.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -4388,4 +4409,94 @@ boolean isSlownode() {
   public BlockPoolManager getBlockPoolManager() {
     return blockPoolManager;
   }
+
+  public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock 
targetBlk,
+      DatanodeInfo targetDn) throws IOException {
+    if (!data.isValidBlock(sourceBlk)) {
+      // block does not exist or is under-construction
+      String errStr =
+          "copyBlock:(" + this.getInfoPort() + ") Can't send invalid block " + 
sourceBlk + " "
+              + data.getReplicaString(sourceBlk.getBlockPoolId(), 
sourceBlk.getBlockId());
+      LOG.info(errStr);
+      throw new IOException(errStr);
+    }
+    long onDiskLength = data.getLength(sourceBlk);
+    if (sourceBlk.getNumBytes() > onDiskLength) {
+      // Shorter on-disk len indicates corruption so report NN the corrupt 
block
+      String msg = "copyBlock: Can't replicate block " + sourceBlk + " because 
on-disk length "
+          + onDiskLength + " is shorter than provided length " + 
sourceBlk.getNumBytes();
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
+    LOG.info(getDatanodeInfo() + " copyBlock: Starting thread to transfer: " + 
"block:"
+        + sourceBlk + " from " + this.getDatanodeUuid() + " to " + 
targetDn.getDatanodeUuid()
+        + "(" + targetDn + ")");
+    Future<?> result;
+    if (this.getDatanodeUuid().equals(targetDn.getDatanodeUuid())) {
+      result = copyBlockCrossNamespaceExecutor.submit(new 
LocalBlockCopy(sourceBlk, targetBlk));
+    } else {
+      result = copyBlockCrossNamespaceExecutor.submit(
+          new DataCopy(targetDn, sourceBlk, targetBlk).getDataTransfer());
+    }
+    try {
+      result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), 
TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new IOException(e);
+    }
+  }
+
+  private class DataCopy {
+    private final DataTransfer dataTransfer;
+
+    DataCopy(DatanodeInfo targetDn, ExtendedBlock sourceBlk, ExtendedBlock 
targetBlk) {
+      FsVolumeImpl volume = (FsVolumeImpl) data.getVolume(sourceBlk);
+      StorageType storageType = volume.getStorageType();
+      String storageId = volume.getStorageID();
+
+      DatanodeInfo[] targets = new DatanodeInfo[] {targetDn};
+      StorageType[] targetStorageTypes = new StorageType[] {storageType};
+      String[] targetStorageIds = new String[] {storageId};
+      dataTransfer =
+          new DataTransfer(targets, targetStorageTypes, targetStorageIds, 
sourceBlk, targetBlk,
+              PIPELINE_SETUP_CREATE, "");
+    }
+
+    public DataTransfer getDataTransfer() {
+      return dataTransfer;
+    }
+  }
+
+  class LocalBlockCopy implements Callable<Boolean> {
+    private ExtendedBlock sourceBlk = null;
+    private ExtendedBlock targetBlk = null;
+
+    LocalBlockCopy(ExtendedBlock sourceBlk, ExtendedBlock targetBlk) {
+      this.sourceBlk = sourceBlk;
+      this.targetBlk = targetBlk;
+    }
+
+    public Boolean call() throws IOException {
+      try {
+        targetBlk.setNumBytes(sourceBlk.getNumBytes());
+        data.hardLinkOneBlock(sourceBlk, targetBlk);
+        FsVolumeSpi v = (FsVolumeSpi) (getFSDataset().getVolume(targetBlk));
+        closeBlock(targetBlk, null, v.getStorageID(), v.isTransientStorage());
+
+        BlockLocalPathInfo srcBlpi = data.getBlockLocalPathInfo(sourceBlk);
+        BlockLocalPathInfo dstBlpi = data.getBlockLocalPathInfo(targetBlk);
+        LOG.info(
+            getClass().getSimpleName() + ": Hardlinked " + sourceBlk + "( " + 
srcBlpi.getBlockPath()
+                + " " + srcBlpi.getMetaPath() + " ) " + "to " + targetBlk + "( 
"
+                + dstBlpi.getBlockPath() + " " + dstBlpi.getMetaPath() + " ) 
");
+
+        metrics.incrBlocksReplicatedViaHardlink();
+      } catch (IOException e) {
+        LOG.warn("Local block copy for src : " + sourceBlk.getBlockName() + ", 
dst : "
+            + targetBlk.getBlockName() + " failed", e);

Review Comment:
   As the above comment.
   Personal opinion, If we can abstract a class from DataCopy and 
LocalBlockCopy, and then have both DataCopy and LocalBlockCopy extend this 
class, it would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to