[ https://issues.apache.org/jira/browse/HDFS-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876719#comment-17876719 ]
ASF GitHub Bot commented on HDFS-16757: --------------------------------------- LiuGuH commented on code in PR #6926: URL: https://github.com/apache/hadoop/pull/6926#discussion_r1731224445 ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java: ########## @@ -3067,69 +3087,70 @@ public void run() { // // Header info // - Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - targetStorageTypes, targetStorageIds); + Token<BlockTokenIdentifier> accessToken = + getBlockAccessToken(target, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), + targetStorageTypes, targetStorageIds); - long writeTimeout = dnConf.socketWriteTimeout + - HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); + long writeTimeout = + dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length + - 1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - DataEncryptionKeyFactory keyFactory = - getDataEncryptionKeyFactoryForBlock(b); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyFactory, accessToken, bpReg); + DataEncryptionKeyFactory keyFactory = getDataEncryptionKeyFactoryForBlock(source); + IOStreamPair saslStreams = + saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, accessToken, bpReg); unbufOut = saslStreams.out; unbufIn = saslStreams.in; - - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtilClient.getSmallBufferSize(getConf()))); + + out = new DataOutputStream( + new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(getConf()))); in = new DataInputStream(unbufIn); - blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, true, DataNode.this, null, cachingStrategy); - DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg) - .build(); - - String storageId = targetStorageIds.length > 0 ? - targetStorageIds[0] : null; - new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, - clientname, targets, targetStorageTypes, srcNode, - stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, - false, false, null, storageId, - targetStorageIds); + blockSender = + new BlockSender(source, 0, source.getNumBytes(), false, false, true, DataNode.this, + null, cachingStrategy); + DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg).build(); + + String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : null; + new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, clientname, targets, + targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), + cachingStrategy, false, false, null, storageId, targetStorageIds); // send data & checksum blockSender.sendBlock(out, unbufOut, throttler); // no response necessary - LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", - getClass().getSimpleName(), DataNode.this.getDisplayName(), - b, b.getNumBytes(), curTarget); + LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", getClass().getSimpleName(), + DataNode.this.getDisplayName(), source, source.getNumBytes(), curTarget); // read ack if (isClient) { - DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + DNTransferAckProto closeAck = + DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in)); LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck); if (closeAck.getStatus() != Status.SUCCESS) { if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( - "Got access token error for connect ack, targets=" - + Arrays.asList(targets)); + "Got access token error for connect ack, targets=" + Arrays.asList(targets)); } else { - throw new IOException("Bad connect ack, targets=" - + Arrays.asList(targets) + " status=" + closeAck.getStatus()); + throw new IOException( + "Bad connect ack, targets=" + Arrays.asList(targets) + " status=" + + closeAck.getStatus()); } } } else { metrics.incrBlocksReplicated(); } } catch (IOException ie) { - handleBadBlock(b, ie, false); - LOG.warn("{}:Failed to transfer {} to {} got", - bpReg, b, targets[0], ie); + if (copyBlockCrossNamespace) { + throw new RuntimeException(ie); Review Comment: DataTransfer is implemented Runnable. And only support throw RuntimeException . This RuntimeException will catch by future.get() try catch . > Add a new method copyBlockCrossNamespace to DataNode > ---------------------------------------------------- > > Key: HDFS-16757 > URL: https://issues.apache.org/jira/browse/HDFS-16757 > Project: Hadoop HDFS > Issue Type: Sub-task > Reporter: ZanderXu > Assignee: Haiyang Hu > Priority: Minor > Labels: pull-request-available > > Add a new method copyBlockCrossNamespace in DataTransferProtocol at the > DataNode Side. > This method will copy a source block from one namespace to a target block > from a different namespace. If the target DN is the same with the current DN, > this method will copy the block via HardLink. If the target DN is different > with the current DN, this method will copy the block via TransferBlock. > This method will contains some parameters: > * ExtendedBlock sourceBlock > * Token<BlockTokenIdentifier> sourceBlockToken > * ExtendedBlock targetBlock > * Token<BlockTokenIdentifier> targetBlockToken > * DatanodeInfo targetDN -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org