Repository: hadoop Updated Branches: refs/heads/branch-2 24d879026 -> dd5b2dac5
HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. Contributed by Anu Engineer and Haohui Mai. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd5b2dac Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd5b2dac Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd5b2dac Branch: refs/heads/branch-2 Commit: dd5b2dac5a81952f579906ddd1c95a2e915b513e Parents: 24d8790 Author: Haohui Mai <whe...@apache.org> Authored: Mon Mar 30 11:59:21 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Mon Mar 30 11:59:32 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../org/apache/hadoop/hdfs/DataStreamer.java | 2 +- .../hdfs/protocol/datatransfer/PipelineAck.java | 31 +++++++++++++------- .../hdfs/server/datanode/BlockReceiver.java | 2 +- .../src/main/proto/datatransfer.proto | 3 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 31 ++++++++++++++++++++ 6 files changed, 58 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b3cc6b7..667aa05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1009,6 +1009,9 @@ Release 2.7.0 - UNRELEASED HDFS-7963. Fix expected tracing spans in TestTracing along with HDFS-7054. (Masatake Iwasaki via kihwal) + HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. + (Anu Engineer and Haohui Mai via wheat9) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 6047825..9c437ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -817,7 +817,7 @@ class DataStreamer extends Daemon { // processes response status from datanodes. for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { final Status reply = PipelineAck.getStatusFromHeader(ack - .getReply(i)); + .getHeaderFlag(i)); // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 35e5bb8..9bd4115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -130,13 +130,16 @@ public class PipelineAck { */ public PipelineAck(long seqno, int[] replies, long downstreamAckTimeNanos) { - ArrayList<Integer> replyList = Lists.newArrayList(); + ArrayList<Status> statusList = Lists.newArrayList(); + ArrayList<Integer> flagList = Lists.newArrayList(); for (int r : replies) { - replyList.add(r); + statusList.add(StatusFormat.getStatus(r)); + flagList.add(r); } proto = PipelineAckProto.newBuilder() .setSeqno(seqno) - .addAllReply(replyList) + .addAllReply(statusList) + .addAllFlag(flagList) .setDownstreamAckTimeNanos(downstreamAckTimeNanos) .build(); } @@ -158,11 +161,18 @@ public class PipelineAck { } /** - * get the ith reply - * @return the the ith reply + * get the header flag of ith reply */ - public int getReply(int i) { - return proto.getReply(i); + public int getHeaderFlag(int i) { + if (proto.getFlagCount() > 0) { + return proto.getFlag(i); + } else { + return combineHeader(ECN.DISABLED, proto.getReply(i)); + } + } + + public int getFlag(int i) { + return proto.getFlag(i); } /** @@ -178,8 +188,8 @@ public class PipelineAck { * @return true if all statuses are SUCCESS */ public boolean isSuccess() { - for (int reply : proto.getReplyList()) { - if (StatusFormat.getStatus(reply) != Status.SUCCESS) { + for (Status s : proto.getReplyList()) { + if (s != Status.SUCCESS) { return false; } } @@ -196,10 +206,9 @@ public class PipelineAck { if (getSeqno() != UNKOWN_SEQNO) { return null; } - for (int reply : proto.getReplyList()) { + for (Status s : proto.getReplyList()) { // The following check is valid because protobuf guarantees to // preserve the ordering of enum elements. - Status s = StatusFormat.getStatus(reply); if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) { return s; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 0a2b650..4e8ce94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -1372,7 +1372,7 @@ class BlockReceiver implements Closeable { replies = new int[ackLen + 1]; replies[0] = myHeader; for (int i = 0; i < ackLen; ++i) { - replies[i + 1] = ack.getReply(i); + replies[i + 1] = ack.getHeaderFlag(i); } // If the mirror has reported that it received a corrupt packet, // do self-destruct to mark myself bad, instead of making the http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 8426198..5071d15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -243,8 +243,9 @@ enum ShortCircuitFdResponse { message PipelineAckProto { required sint64 seqno = 1; - repeated uint32 reply = 2; + repeated Status reply = 2; optional uint64 downstreamAckTimeNanos = 3 [default = 0]; + repeated uint32 flag = 4 [packed=true]; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index a6716b1..16889d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -33,6 +33,7 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.Random; +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; @@ -524,6 +526,35 @@ public class TestDataTransferProtocol { assertFalse(hdr.sanityCheck(100)); } + @Test + public void TestPipeLineAckCompatibility() throws IOException { + DataTransferProtos.PipelineAckProto proto = DataTransferProtos + .PipelineAckProto.newBuilder() + .setSeqno(0) + .addReply(Status.CHECKSUM_OK) + .build(); + + DataTransferProtos.PipelineAckProto newProto = DataTransferProtos + .PipelineAckProto.newBuilder().mergeFrom(proto) + .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, + Status.CHECKSUM_OK)) + .build(); + + ByteOutputStream oldAckBytes = new ByteOutputStream(); + proto.writeDelimitedTo(oldAckBytes); + PipelineAck oldAck = new PipelineAck(); + oldAck.readFields(new ByteArrayInputStream(oldAckBytes.getBytes())); + assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status + .CHECKSUM_OK), oldAck.getHeaderFlag(0)); + + PipelineAck newAck = new PipelineAck(); + ByteOutputStream newAckBytes = new ByteOutputStream(); + newProto.writeDelimitedTo(newAckBytes); + newAck.readFields(new ByteArrayInputStream(newAckBytes.getBytes())); + assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status + .CHECKSUM_OK), newAck.getHeaderFlag(0)); + } + void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException { writeBlock(new ExtendedBlock(poolId, blockId), BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);