[ 
https://issues.apache.org/jira/browse/HDFS-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876489#comment-17876489
 ] 

ASF GitHub Bot commented on HDFS-16757:
---------------------------------------

Hexiaoqiao commented on code in PR #6926:
URL: https://github.com/apache/hadoop/pull/6926#discussion_r1730295351


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3067,69 +3087,70 @@ public void run() {
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
-            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            targetStorageTypes, targetStorageIds);
+        Token<BlockTokenIdentifier> accessToken =
+            getBlockAccessToken(target, 
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+                targetStorageTypes, targetStorageIds);
 
-        long writeTimeout = dnConf.socketWriteTimeout + 
-                            HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
+        long writeTimeout =
+            dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION 
* (targets.length
+                - 1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        DataEncryptionKeyFactory keyFactory =
-          getDataEncryptionKeyFactoryForBlock(b);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, keyFactory, accessToken, bpReg);
+        DataEncryptionKeyFactory keyFactory = 
getDataEncryptionKeyFactoryForBlock(source);
+        IOStreamPair saslStreams =
+            saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, 
accessToken, bpReg);
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
-        
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            DFSUtilClient.getSmallBufferSize(getConf())));
+
+        out = new DataOutputStream(
+            new BufferedOutputStream(unbufOut, 
DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
-        blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null, cachingStrategy);
-        DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
-            .build();
-
-        String storageId = targetStorageIds.length > 0 ?
-            targetStorageIds[0] : null;
-        new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
-            clientname, targets, targetStorageTypes, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false, false, null, storageId,
-            targetStorageIds);
+        blockSender =
+            new BlockSender(source, 0, source.getNumBytes(), false, false, 
true, DataNode.this,
+                null, cachingStrategy);
+        DatanodeInfo srcNode = new 
DatanodeInfoBuilder().setNodeID(bpReg).build();
+
+        String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : 
null;
+        new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, 
clientname, targets,
+            targetStorageTypes, srcNode, stage, 0, 0, 0, 0, 
blockSender.getChecksum(),
+            cachingStrategy, false, false, null, storageId, targetStorageIds);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, throttler);
 
         // no response necessary
-        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
-            getClass().getSimpleName(), DataNode.this.getDisplayName(),
-            b, b.getNumBytes(), curTarget);
+        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", 
getClass().getSimpleName(),
+            DataNode.this.getDisplayName(), source, source.getNumBytes(), 
curTarget);
 
         // read ack
         if (isClient) {
-          DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              PBHelperClient.vintPrefixed(in));
+          DNTransferAckProto closeAck =
+              DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in));
           LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck);
           if (closeAck.getStatus() != Status.SUCCESS) {
             if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
-                  "Got access token error for connect ack, targets="
-                   + Arrays.asList(targets));
+                  "Got access token error for connect ack, targets=" + 
Arrays.asList(targets));
             } else {
-              throw new IOException("Bad connect ack, targets="
-                  + Arrays.asList(targets) + " status=" + 
closeAck.getStatus());
+              throw new IOException(
+                  "Bad connect ack, targets=" + Arrays.asList(targets) + " 
status="
+                      + closeAck.getStatus());
             }
           }
         } else {
           metrics.incrBlocksReplicated();
         }
       } catch (IOException ie) {
-        handleBadBlock(b, ie, false);
-        LOG.warn("{}:Failed to transfer {} to {} got",
-            bpReg, b, targets[0], ie);
+        if (copyBlockCrossNamespace) {
+          throw new RuntimeException(ie);
+        }
+        handleBadBlock(source, ie, false);
+        LOG.warn("{}:Failed to transfer {} to {}  got", bpReg, source, 
targets[0], ie);
       } catch (Throwable t) {
-        LOG.error("Failed to transfer block {}", b, t);
+        LOG.error("Failed to transfer block {}", source, t);
+        if (copyBlockCrossNamespace) {
+          throw new RuntimeException(t);

Review Comment:
   As the above comment.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3067,69 +3087,70 @@ public void run() {
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
-            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            targetStorageTypes, targetStorageIds);
+        Token<BlockTokenIdentifier> accessToken =
+            getBlockAccessToken(target, 
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+                targetStorageTypes, targetStorageIds);
 
-        long writeTimeout = dnConf.socketWriteTimeout + 
-                            HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
+        long writeTimeout =
+            dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION 
* (targets.length
+                - 1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        DataEncryptionKeyFactory keyFactory =
-          getDataEncryptionKeyFactoryForBlock(b);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, keyFactory, accessToken, bpReg);
+        DataEncryptionKeyFactory keyFactory = 
getDataEncryptionKeyFactoryForBlock(source);
+        IOStreamPair saslStreams =
+            saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, 
accessToken, bpReg);
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
-        
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            DFSUtilClient.getSmallBufferSize(getConf())));
+
+        out = new DataOutputStream(
+            new BufferedOutputStream(unbufOut, 
DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
-        blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null, cachingStrategy);
-        DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
-            .build();
-
-        String storageId = targetStorageIds.length > 0 ?
-            targetStorageIds[0] : null;
-        new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
-            clientname, targets, targetStorageTypes, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false, false, null, storageId,
-            targetStorageIds);
+        blockSender =
+            new BlockSender(source, 0, source.getNumBytes(), false, false, 
true, DataNode.this,
+                null, cachingStrategy);
+        DatanodeInfo srcNode = new 
DatanodeInfoBuilder().setNodeID(bpReg).build();
+
+        String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : 
null;
+        new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, 
clientname, targets,
+            targetStorageTypes, srcNode, stage, 0, 0, 0, 0, 
blockSender.getChecksum(),
+            cachingStrategy, false, false, null, storageId, targetStorageIds);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, throttler);
 
         // no response necessary
-        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
-            getClass().getSimpleName(), DataNode.this.getDisplayName(),
-            b, b.getNumBytes(), curTarget);
+        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", 
getClass().getSimpleName(),
+            DataNode.this.getDisplayName(), source, source.getNumBytes(), 
curTarget);
 
         // read ack
         if (isClient) {
-          DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              PBHelperClient.vintPrefixed(in));
+          DNTransferAckProto closeAck =
+              DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in));
           LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck);
           if (closeAck.getStatus() != Status.SUCCESS) {
             if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
-                  "Got access token error for connect ack, targets="
-                   + Arrays.asList(targets));
+                  "Got access token error for connect ack, targets=" + 
Arrays.asList(targets));
             } else {
-              throw new IOException("Bad connect ack, targets="
-                  + Arrays.asList(targets) + " status=" + 
closeAck.getStatus());
+              throw new IOException(
+                  "Bad connect ack, targets=" + Arrays.asList(targets) + " 
status="
+                      + closeAck.getStatus());
             }
           }
         } else {
           metrics.incrBlocksReplicated();
         }
       } catch (IOException ie) {
-        handleBadBlock(b, ie, false);
-        LOG.warn("{}:Failed to transfer {} to {} got",
-            bpReg, b, targets[0], ie);
+        if (copyBlockCrossNamespace) {
+          throw new RuntimeException(ie);

Review Comment:
   Be careful to throw RuntimeException directly here.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java:
##########
@@ -3863,5 +3864,27 @@ public void setLastDirScannerFinishTime(long time) {
   public long getPendingAsyncDeletions() {
     return asyncDiskService.countPendingDeletions();
   }
+
+  @Override
+  public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) 
throws IOException {
+    BlockLocalPathInfo blpi = getBlockLocalPathInfo(srcBlock);
+    FsVolumeImpl v = getVolume(srcBlock);
+
+    try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, 
dstBlock.getBlockPoolId(),

Review Comment:
   Not sure if it is enough to obtain only dest block pool lock here.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java:
##########
@@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer
   private FileEncryptionInfo fileEncryptionInfo;
   private int writePacketSize;
   private boolean leaseRecovered = false;
+  private ExtendedBlock userAssignmentLastBlock;

Review Comment:
   Sorry I don't get the purpose to add `userAssignmentLastBlock` here.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3003,46 +3010,58 @@ private class DataTransfer implements Runnable {
     final DatanodeInfo[] targets;
     final StorageType[] targetStorageTypes;
     final private String[] targetStorageIds;
-    final ExtendedBlock b;
+    final private ExtendedBlock source;
+    private ExtendedBlock target;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
     final String clientname;
     final CachingStrategy cachingStrategy;
 
-    /** Throttle to block replication when data transfers or writes. */
+    /**
+     * Throttle to block replication when data transfers or writes.
+     */
     private DataTransferThrottler throttler;
+    private boolean copyBlockCrossNamespace;

Review Comment:
   + What this attribution describe here?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java:
##########
@@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
       "dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
   public static final long 
DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
       0; // A value of zero indicates no limit
+  public static final String 
DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY =

Review Comment:
   Is this one config item same as CopyOp time out and is it necessary? 





> Add a new method copyBlockCrossNamespace to DataNode
> ----------------------------------------------------
>
>                 Key: HDFS-16757
>                 URL: https://issues.apache.org/jira/browse/HDFS-16757
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: ZanderXu
>            Assignee: Haiyang Hu
>            Priority: Minor
>              Labels: pull-request-available
>
> Add a new method copyBlockCrossNamespace in DataTransferProtocol at the 
> DataNode Side.
> This method will copy a source block from one namespace to a target block 
> from a different namespace. If the target DN is the same with the current DN, 
> this method will copy the block via HardLink. If the target DN is different 
> with the current DN, this method will copy the block via TransferBlock.
> This method will contains some parameters:
>  * ExtendedBlock sourceBlock
>  * Token<BlockTokenIdentifier> sourceBlockToken
>  * ExtendedBlock targetBlock
>  * Token<BlockTokenIdentifier> targetBlockToken
>  * DatanodeInfo targetDN



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to