Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 530c2ef91 -> 86c0c6b04


HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. 
Contributed by Anu Engineer and Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86c0c6b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86c0c6b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86c0c6b0

Branch: refs/heads/branch-2.7
Commit: 86c0c6b0446a29e2551a2b207ddfc25051a0cd47
Parents: 530c2ef
Author: Haohui Mai <whe...@apache.org>
Authored: Mon Mar 30 11:59:21 2015 -0700
Committer: Haohui Mai <whe...@apache.org>
Committed: Mon Mar 30 12:16:25 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  2 +-
 .../hdfs/protocol/datatransfer/PipelineAck.java | 31 ++++++++++++-------
 .../hdfs/server/datanode/BlockReceiver.java     |  2 +-
 .../src/main/proto/datatransfer.proto           |  3 +-
 .../hadoop/hdfs/TestDataTransferProtocol.java   | 32 ++++++++++++++++++++
 6 files changed, 59 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c0c6b0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b96b24e..90efc99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -941,6 +941,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7963. Fix expected tracing spans in TestTracing along with HDFS-7054.
     (Masatake Iwasaki via kihwal)
 
+    HDFS-7748. Separate ECN flags from the Status in the 
DataTransferPipelineAck.
+    (Anu Engineer and Haohui Mai via wheat9)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c0c6b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index a1b220a..f105530 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -747,7 +747,7 @@ public class DFSOutputStream extends FSOutputSummer
             // processes response status from datanodes.
             for (int i = ack.getNumOfReplies()-1; i >=0  && 
dfsClient.clientRunning; i--) {
               final Status reply = PipelineAck.getStatusFromHeader(ack
-                .getReply(i));
+                .getHeaderFlag(i));
               // Restart will not be treated differently unless it is
               // the local node or the only one in the pipeline.
               if (PipelineAck.isRestartOOBStatus(reply) &&

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c0c6b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index 35e5bb8..9bd4115 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -130,13 +130,16 @@ public class PipelineAck {
    */
   public PipelineAck(long seqno, int[] replies,
                      long downstreamAckTimeNanos) {
-    ArrayList<Integer> replyList = Lists.newArrayList();
+    ArrayList<Status> statusList = Lists.newArrayList();
+    ArrayList<Integer> flagList = Lists.newArrayList();
     for (int r : replies) {
-      replyList.add(r);
+      statusList.add(StatusFormat.getStatus(r));
+      flagList.add(r);
     }
     proto = PipelineAckProto.newBuilder()
       .setSeqno(seqno)
-      .addAllReply(replyList)
+      .addAllReply(statusList)
+      .addAllFlag(flagList)
       .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
       .build();
   }
@@ -158,11 +161,18 @@ public class PipelineAck {
   }
   
   /**
-   * get the ith reply
-   * @return the the ith reply
+   * get the header flag of ith reply
    */
-  public int getReply(int i) {
-    return proto.getReply(i);
+  public int getHeaderFlag(int i) {
+    if (proto.getFlagCount() > 0) {
+      return proto.getFlag(i);
+    } else {
+      return combineHeader(ECN.DISABLED, proto.getReply(i));
+    }
+  }
+
+  public int getFlag(int i) {
+    return proto.getFlag(i);
   }
 
   /**
@@ -178,8 +188,8 @@ public class PipelineAck {
    * @return true if all statuses are SUCCESS
    */
   public boolean isSuccess() {
-    for (int reply : proto.getReplyList()) {
-      if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
+    for (Status s : proto.getReplyList()) {
+      if (s != Status.SUCCESS) {
         return false;
       }
     }
@@ -196,10 +206,9 @@ public class PipelineAck {
     if (getSeqno() != UNKOWN_SEQNO) {
       return null;
     }
-    for (int reply : proto.getReplyList()) {
+    for (Status s : proto.getReplyList()) {
       // The following check is valid because protobuf guarantees to
       // preserve the ordering of enum elements.
-      Status s = StatusFormat.getStatus(reply);
       if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
         return s;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c0c6b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 0a2b650..4e8ce94 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1372,7 +1372,7 @@ class BlockReceiver implements Closeable {
         replies = new int[ackLen + 1];
         replies[0] = myHeader;
         for (int i = 0; i < ackLen; ++i) {
-          replies[i + 1] = ack.getReply(i);
+          replies[i + 1] = ack.getHeaderFlag(i);
         }
         // If the mirror has reported that it received a corrupt packet,
         // do self-destruct to mark myself bad, instead of making the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c0c6b0/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 8426198..5071d15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -243,8 +243,9 @@ enum ShortCircuitFdResponse {
 
 message PipelineAckProto {
   required sint64 seqno = 1;
-  repeated uint32 reply = 2;
+  repeated Status reply = 2;
   optional uint64 downstreamAckTimeNanos = 3 [default = 0];
+  repeated uint32 flag = 4 [packed=true];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c0c6b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index a6716b1..c4ec4f3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
@@ -524,6 +525,37 @@ public class TestDataTransferProtocol {
     assertFalse(hdr.sanityCheck(100));
   }
 
+  @Test
+  public void TestPipeLineAckCompatibility() throws IOException {
+    DataTransferProtos.PipelineAckProto proto = DataTransferProtos
+        .PipelineAckProto.newBuilder()
+        .setSeqno(0)
+        .addReply(Status.CHECKSUM_OK)
+        .build();
+
+    DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
+        .PipelineAckProto.newBuilder().mergeFrom(proto)
+        .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
+                                           Status.CHECKSUM_OK))
+        .build();
+
+    ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
+    proto.writeDelimitedTo(oldAckBytes);
+    PipelineAck oldAck = new PipelineAck();
+    oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray()));
+    assertEquals(
+        PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, 
Status.CHECKSUM_OK),
+        oldAck.getHeaderFlag(0));
+
+    PipelineAck newAck = new PipelineAck();
+    ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
+    newProto.writeDelimitedTo(newAckBytes);
+    newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray()));
+    assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
+                                           Status.CHECKSUM_OK),
+                 newAck.getHeaderFlag(0));
+  }
+
   void writeBlock(String poolId, long blockId, DataChecksum checksum) throws 
IOException {
     writeBlock(new ExtendedBlock(poolId, blockId),
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);

Reply via email to