[ 
https://issues.apache.org/jira/browse/HDFS-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876719#comment-17876719
 ] 

ASF GitHub Bot commented on HDFS-16757:
---------------------------------------

LiuGuH commented on code in PR #6926:
URL: https://github.com/apache/hadoop/pull/6926#discussion_r1731224445


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3067,69 +3087,70 @@ public void run() {
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
-            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            targetStorageTypes, targetStorageIds);
+        Token<BlockTokenIdentifier> accessToken =
+            getBlockAccessToken(target, 
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+                targetStorageTypes, targetStorageIds);
 
-        long writeTimeout = dnConf.socketWriteTimeout + 
-                            HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
+        long writeTimeout =
+            dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION 
* (targets.length
+                - 1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        DataEncryptionKeyFactory keyFactory =
-          getDataEncryptionKeyFactoryForBlock(b);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, keyFactory, accessToken, bpReg);
+        DataEncryptionKeyFactory keyFactory = 
getDataEncryptionKeyFactoryForBlock(source);
+        IOStreamPair saslStreams =
+            saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, 
accessToken, bpReg);
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
-        
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            DFSUtilClient.getSmallBufferSize(getConf())));
+
+        out = new DataOutputStream(
+            new BufferedOutputStream(unbufOut, 
DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
-        blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null, cachingStrategy);
-        DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
-            .build();
-
-        String storageId = targetStorageIds.length > 0 ?
-            targetStorageIds[0] : null;
-        new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
-            clientname, targets, targetStorageTypes, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false, false, null, storageId,
-            targetStorageIds);
+        blockSender =
+            new BlockSender(source, 0, source.getNumBytes(), false, false, 
true, DataNode.this,
+                null, cachingStrategy);
+        DatanodeInfo srcNode = new 
DatanodeInfoBuilder().setNodeID(bpReg).build();
+
+        String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : 
null;
+        new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, 
clientname, targets,
+            targetStorageTypes, srcNode, stage, 0, 0, 0, 0, 
blockSender.getChecksum(),
+            cachingStrategy, false, false, null, storageId, targetStorageIds);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, throttler);
 
         // no response necessary
-        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
-            getClass().getSimpleName(), DataNode.this.getDisplayName(),
-            b, b.getNumBytes(), curTarget);
+        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", 
getClass().getSimpleName(),
+            DataNode.this.getDisplayName(), source, source.getNumBytes(), 
curTarget);
 
         // read ack
         if (isClient) {
-          DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              PBHelperClient.vintPrefixed(in));
+          DNTransferAckProto closeAck =
+              DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in));
           LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck);
           if (closeAck.getStatus() != Status.SUCCESS) {
             if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
-                  "Got access token error for connect ack, targets="
-                   + Arrays.asList(targets));
+                  "Got access token error for connect ack, targets=" + 
Arrays.asList(targets));
             } else {
-              throw new IOException("Bad connect ack, targets="
-                  + Arrays.asList(targets) + " status=" + 
closeAck.getStatus());
+              throw new IOException(
+                  "Bad connect ack, targets=" + Arrays.asList(targets) + " 
status="
+                      + closeAck.getStatus());
             }
           }
         } else {
           metrics.incrBlocksReplicated();
         }
       } catch (IOException ie) {
-        handleBadBlock(b, ie, false);
-        LOG.warn("{}:Failed to transfer {} to {} got",
-            bpReg, b, targets[0], ie);
+        if (copyBlockCrossNamespace) {
+          throw new RuntimeException(ie);

Review Comment:
   DataTransfer is implemented Runnable. And only support throw 
RuntimeException .   This RuntimeException will catch by future.get()  try 
catch . 





> Add a new method copyBlockCrossNamespace to DataNode
> ----------------------------------------------------
>
>                 Key: HDFS-16757
>                 URL: https://issues.apache.org/jira/browse/HDFS-16757
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: ZanderXu
>            Assignee: Haiyang Hu
>            Priority: Minor
>              Labels: pull-request-available
>
> Add a new method copyBlockCrossNamespace in DataTransferProtocol at the 
> DataNode Side.
> This method will copy a source block from one namespace to a target block 
> from a different namespace. If the target DN is the same with the current DN, 
> this method will copy the block via HardLink. If the target DN is different 
> with the current DN, this method will copy the block via TransferBlock.
> This method will contains some parameters:
>  * ExtendedBlock sourceBlock
>  * Token<BlockTokenIdentifier> sourceBlockToken
>  * ExtendedBlock targetBlock
>  * Token<BlockTokenIdentifier> targetBlockToken
>  * DatanodeInfo targetDN



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to