Repository: hadoop
Updated Branches:
  refs/heads/branch-3.2 966d94ea2 -> 90a9837c9


HDFS-13956. iNotify should include information to identify a file as either 
replicated or erasure coded. Contributed by Hrishikesh Gadre.

Signed-off-by: Wei-Chiu Chuang <[email protected]>
(cherry picked from commit bf3d591f0cb0fedeab5d89cc8d2270d3b9a70313)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/90a9837c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/90a9837c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/90a9837c

Branch: refs/heads/branch-3.2
Commit: 90a9837c9dd3e3a4b6631916ae3dab7c3745b9a7
Parents: 966d94e
Author: Hrishikesh Gadre <[email protected]>
Authored: Wed Oct 10 10:23:07 2018 -0700
Committer: Wei-Chiu Chuang <[email protected]>
Committed: Wed Oct 10 10:24:23 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/inotify/Event.java   |  14 +++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  60 ++++++-----
 .../src/main/proto/inotify.proto                |   1 +
 .../namenode/InotifyFSEditLogOpTranslator.java  |   3 +
 .../hdfs/TestDFSInotifyEventInputStream.java    | 100 +++++++++++++++++++
 5 files changed, 152 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/90a9837c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
index 78a85b0..2ae8f0a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Events sent by the inotify system. Note that no events are necessarily sent
@@ -112,6 +113,7 @@ public abstract class Event {
     private String symlinkTarget;
     private boolean overwrite;
     private long defaultBlockSize;
+    private Optional<Boolean> erasureCoded;
 
     public static class Builder {
       private INodeType iNodeType;
@@ -124,6 +126,7 @@ public abstract class Event {
       private String symlinkTarget;
       private boolean overwrite;
       private long defaultBlockSize = 0;
+      private Optional<Boolean> erasureCoded = Optional.empty();
 
       public Builder iNodeType(INodeType type) {
         this.iNodeType = type;
@@ -175,6 +178,11 @@ public abstract class Event {
         return this;
       }
 
+      public Builder erasureCoded(boolean ecCoded) {
+        this.erasureCoded = Optional.of(ecCoded);
+        return this;
+      }
+
       public CreateEvent build() {
         return new CreateEvent(this);
       }
@@ -192,6 +200,7 @@ public abstract class Event {
       this.symlinkTarget = b.symlinkTarget;
       this.overwrite = b.overwrite;
       this.defaultBlockSize = b.defaultBlockSize;
+      this.erasureCoded = b.erasureCoded;
     }
 
     public INodeType getiNodeType() {
@@ -243,6 +252,10 @@ public abstract class Event {
       return defaultBlockSize;
     }
 
+    public Optional<Boolean> isErasureCoded() {
+      return erasureCoded;
+    }
+
     @Override
     @InterfaceStability.Unstable
     public String toString() {
@@ -261,6 +274,7 @@ public abstract class Event {
 
       content.append("overwrite=").append(overwrite)
           .append(", defaultBlockSize=").append(defaultBlockSize)
+          .append(", erasureCoded=").append(erasureCoded)
           .append("]");
       return content.toString();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90a9837c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 4a5a493..4b966bb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -908,18 +908,22 @@ public class PBHelperClient {
         case EVENT_CREATE:
           InotifyProtos.CreateEventProto create =
               InotifyProtos.CreateEventProto.parseFrom(p.getContents());
-          events.add(new Event.CreateEvent.Builder()
-              .iNodeType(createTypeConvert(create.getType()))
-              .path(create.getPath())
-              .ctime(create.getCtime())
-              .ownerName(create.getOwnerName())
-              .groupName(create.getGroupName())
-              .perms(convert(create.getPerms()))
-              .replication(create.getReplication())
-              .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
-                  create.getSymlinkTarget())
-              .defaultBlockSize(create.getDefaultBlockSize())
-              .overwrite(create.getOverwrite()).build());
+          Event.CreateEvent.Builder builder = new Event.CreateEvent.Builder()
+                  .iNodeType(createTypeConvert(create.getType()))
+                  .path(create.getPath())
+                  .ctime(create.getCtime())
+                  .ownerName(create.getOwnerName())
+                  .groupName(create.getGroupName())
+                  .perms(convert(create.getPerms()))
+                  .replication(create.getReplication())
+                  .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
+                          create.getSymlinkTarget())
+                  .defaultBlockSize(create.getDefaultBlockSize())
+                  .overwrite(create.getOverwrite());
+          if (create.hasErasureCoded()) {
+            builder.erasureCoded(create.getErasureCoded());
+          }
+          events.add(builder.build());
           break;
         case EVENT_METADATA:
           InotifyProtos.MetadataUpdateEventProto meta =
@@ -2909,22 +2913,26 @@ public class PBHelperClient {
           break;
         case CREATE:
           Event.CreateEvent ce2 = (Event.CreateEvent) e;
+          InotifyProtos.CreateEventProto.Builder pB =
+                  (InotifyProtos.CreateEventProto.newBuilder());
+          pB.setType(createTypeConvert(ce2.getiNodeType()))
+             .setPath(ce2.getPath())
+             .setCtime(ce2.getCtime())
+             .setOwnerName(ce2.getOwnerName())
+             .setGroupName(ce2.getGroupName())
+             .setPerms(convert(ce2.getPerms()))
+             .setReplication(ce2.getReplication())
+             .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
+                        "" : ce2.getSymlinkTarget())
+             .setDefaultBlockSize(ce2.getDefaultBlockSize())
+             .setOverwrite(ce2.getOverwrite());
+          if (ce2.isErasureCoded().isPresent()) {
+            pB.setErasureCoded(ce2.isErasureCoded().get());
+          }
           events.add(InotifyProtos.EventProto.newBuilder()
               .setType(InotifyProtos.EventType.EVENT_CREATE)
-              .setContents(
-                  InotifyProtos.CreateEventProto.newBuilder()
-                      .setType(createTypeConvert(ce2.getiNodeType()))
-                      .setPath(ce2.getPath())
-                      .setCtime(ce2.getCtime())
-                      .setOwnerName(ce2.getOwnerName())
-                      .setGroupName(ce2.getGroupName())
-                      .setPerms(convert(ce2.getPerms()))
-                      .setReplication(ce2.getReplication())
-                      .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
-                          "" : ce2.getSymlinkTarget())
-                      .setDefaultBlockSize(ce2.getDefaultBlockSize())
-                      .setOverwrite(ce2.getOverwrite()).build().toByteString()
-              ).build());
+              .setContents(pB.build().toByteString())
+              .build());
           break;
         case METADATA:
           Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90a9837c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto
index 5339902..f193408 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto
@@ -80,6 +80,7 @@ message CreateEventProto {
   optional string symlinkTarget = 8;
   optional bool overwrite = 9;
   optional int64 defaultBlockSize = 10 [default=0];
+  optional bool erasureCoded = 11;
 }
 
 message CloseEventProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90a9837c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
index 0918107..8a54c8a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.inotify.Event;
 import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
 
 import java.util.List;
 
@@ -54,6 +55,8 @@ public class InotifyFSEditLogOpTranslator {
             .perms(addOp.permissions.getPermission())
             .overwrite(addOp.overwrite)
             .defaultBlockSize(addOp.blockSize)
+            .erasureCoded(addOp.erasureCodingPolicyId
+                    != ErasureCodeConstants.REPLICATION_POLICY_ID)
             .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
       } else { // append
         return new EventBatch(op.txid,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/90a9837c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index b0b85e7..05d3c63 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs;
 
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -140,6 +141,7 @@ public class TestDFSInotifyEventInputStream {
       client.rename("/file5", "/dir"); // RenameOldOp -> RenameEvent
       //TruncateOp -> TruncateEvent
       client.truncate("/truncate_file", BLOCK_SIZE);
+      client.create("/file_ec_test1", false);
       EventBatch batch = null;
 
       // RenameOp
@@ -180,6 +182,8 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(ce.getSymlinkTarget() == null);
       Assert.assertTrue(ce.getOverwrite());
       Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
+      Assert.assertTrue(ce.isErasureCoded().isPresent());
+      Assert.assertFalse(ce.isErasureCoded().get());
       LOG.info(ce.toString());
       Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
 
@@ -395,6 +399,25 @@ public class TestDFSInotifyEventInputStream {
       LOG.info(et.toString());
       Assert.assertTrue(et.toString().startsWith("TruncateEvent [path="));
 
+      // CreateEvent without overwrite
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType()
+              == Event.EventType.CREATE);
+      ce = (Event.CreateEvent) batch.getEvents()[0];
+      Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
+      Assert.assertTrue(ce.getPath().equals("/file_ec_test1"));
+      Assert.assertTrue(ce.getCtime() > 0);
+      Assert.assertTrue(ce.getReplication() > 0);
+      Assert.assertTrue(ce.getSymlinkTarget() == null);
+      Assert.assertFalse(ce.getOverwrite());
+      Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
+      Assert.assertTrue(ce.isErasureCoded().isPresent());
+      Assert.assertFalse(ce.isErasureCoded().get());
+      LOG.info(ce.toString());
+      Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
+
       // Returns null when there are no further events
       Assert.assertTrue(eis.poll() == null);
 
@@ -411,6 +434,83 @@ public class TestDFSInotifyEventInputStream {
   }
 
   @Test(timeout = 120000)
+  public void testErasureCodedFiles() throws Exception {
+    ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
+    final int dataUnits = ecPolicy.getNumDataUnits();
+    final int parityUnits = ecPolicy.getNumParityUnits();
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, ecPolicy.getCellSize());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // so that we can get an atime change
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1);
+
+    MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
+    builder.getDfsBuilder().numDataNodes(dataUnits + parityUnits);
+    MiniQJMHACluster cluster = builder.build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0)
+              .getNameNodeAddress(), conf);
+      DistributedFileSystem fs =
+              (DistributedFileSystem)cluster.getDfsCluster().getFileSystem(0);
+
+      Path ecDir = new Path("/ecdir");
+      fs.mkdirs(ecDir);
+      fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
+
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+
+      int sz = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
+      byte[] contents = new byte[sz];
+      DFSTestUtil.writeFile(fs, new Path("/ecdir/file_ec_test2"), contents);
+
+      EventBatch batch = null;
+
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      long txid = batch.getTxid();
+      long eventsBehind = eis.getTxidsBehindEstimate();
+      Assert.assertTrue(batch.getEvents()[0].getEventType()
+              == Event.EventType.CREATE);
+      Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
+      Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
+      Assert.assertTrue(ce.getPath().equals("/ecdir/file_ec_test2"));
+      Assert.assertTrue(ce.getCtime() > 0);
+      Assert.assertEquals(1, ce.getReplication());
+      Assert.assertTrue(ce.getSymlinkTarget() == null);
+      Assert.assertTrue(ce.getOverwrite());
+      Assert.assertEquals(ecPolicy.getCellSize(), ce.getDefaultBlockSize());
+      Assert.assertTrue(ce.isErasureCoded().isPresent());
+      Assert.assertTrue(ce.isErasureCoded().get());
+      LOG.info(ce.toString());
+      Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
+
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType()
+              == Event.EventType.CLOSE);
+      Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath()
+              .equals("/ecdir/file_ec_test2"));
+
+      // Returns null when there are no further events
+      Assert.assertTrue(eis.poll() == null);
+
+      // make sure the estimate hasn't changed since the above assertion
+      // tells us that we are fully caught up to the current namesystem state
+      // and we should not have been behind at all when eventsBehind was set
+      // either, since there were few enough events that they should have all
+      // been read to the client during the first poll() call
+      Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000)
   public void testNNFailover() throws IOException, URISyntaxException,
       MissingEventsException {
     Configuration conf = new HdfsConfiguration();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to