LiuGuH commented on code in PR #6926:
URL: https://github.com/apache/hadoop/pull/6926#discussion_r1731224445


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3067,69 +3087,70 @@ public void run() {
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
-            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
-            targetStorageTypes, targetStorageIds);
+        Token<BlockTokenIdentifier> accessToken =
+            getBlockAccessToken(target, 
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+                targetStorageTypes, targetStorageIds);
 
-        long writeTimeout = dnConf.socketWriteTimeout + 
-                            HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
+        long writeTimeout =
+            dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION 
* (targets.length
+                - 1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        DataEncryptionKeyFactory keyFactory =
-          getDataEncryptionKeyFactoryForBlock(b);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, keyFactory, accessToken, bpReg);
+        DataEncryptionKeyFactory keyFactory = 
getDataEncryptionKeyFactoryForBlock(source);
+        IOStreamPair saslStreams =
+            saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, 
accessToken, bpReg);
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
-        
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            DFSUtilClient.getSmallBufferSize(getConf())));
+
+        out = new DataOutputStream(
+            new BufferedOutputStream(unbufOut, 
DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
-        blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null, cachingStrategy);
-        DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
-            .build();
-
-        String storageId = targetStorageIds.length > 0 ?
-            targetStorageIds[0] : null;
-        new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
-            clientname, targets, targetStorageTypes, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false, false, null, storageId,
-            targetStorageIds);
+        blockSender =
+            new BlockSender(source, 0, source.getNumBytes(), false, false, 
true, DataNode.this,
+                null, cachingStrategy);
+        DatanodeInfo srcNode = new 
DatanodeInfoBuilder().setNodeID(bpReg).build();
+
+        String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : 
null;
+        new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, 
clientname, targets,
+            targetStorageTypes, srcNode, stage, 0, 0, 0, 0, 
blockSender.getChecksum(),
+            cachingStrategy, false, false, null, storageId, targetStorageIds);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, throttler);
 
         // no response necessary
-        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
-            getClass().getSimpleName(), DataNode.this.getDisplayName(),
-            b, b.getNumBytes(), curTarget);
+        LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", 
getClass().getSimpleName(),
+            DataNode.this.getDisplayName(), source, source.getNumBytes(), 
curTarget);
 
         // read ack
         if (isClient) {
-          DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              PBHelperClient.vintPrefixed(in));
+          DNTransferAckProto closeAck =
+              DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in));
           LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck);
           if (closeAck.getStatus() != Status.SUCCESS) {
             if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
-                  "Got access token error for connect ack, targets="
-                   + Arrays.asList(targets));
+                  "Got access token error for connect ack, targets=" + 
Arrays.asList(targets));
             } else {
-              throw new IOException("Bad connect ack, targets="
-                  + Arrays.asList(targets) + " status=" + 
closeAck.getStatus());
+              throw new IOException(
+                  "Bad connect ack, targets=" + Arrays.asList(targets) + " 
status="
+                      + closeAck.getStatus());
             }
           }
         } else {
           metrics.incrBlocksReplicated();
         }
       } catch (IOException ie) {
-        handleBadBlock(b, ie, false);
-        LOG.warn("{}:Failed to transfer {} to {} got",
-            bpReg, b, targets[0], ie);
+        if (copyBlockCrossNamespace) {
+          throw new RuntimeException(ie);

Review Comment:
   DataTransfer is implemented Runnable. And only support throw 
RuntimeException .   This RuntimeException will catch by future.get()  try 
catch . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to