This is an automated email from the ASF dual-hosted git repository.

vinayakumarb pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f940ab2  HDFS-7663. Erasure Coding: Append on striped file. 
Contributed by Ayush Saxena.
f940ab2 is described below

commit f940ab242da80a22bae95509d5c282d7e2f7ecdb
Author: Vinayakumar B <vinayakum...@apache.org>
AuthorDate: Tue Mar 5 19:26:42 2019 +0530

    HDFS-7663. Erasure Coding: Append on striped file. Contributed by Ayush 
Saxena.
---
 .../org/apache/hadoop/hdfs/DFSOutputStream.java    |  16 +--
 .../apache/hadoop/hdfs/DFSStripedOutputStream.java |  20 +++-
 .../hadoop/hdfs/server/namenode/FSDirAppendOp.java |  10 +-
 .../apache/hadoop/hdfs/TestStripedFileAppend.java  | 114 +++++++++++++++++++++
 4 files changed, 145 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index aaef8ad..a4e0742 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -119,7 +119,7 @@ public class DFSOutputStream extends FSOutputSummer
   protected int packetSize = 0; // write packet size, not including the header.
   protected int chunksPerPacket = 0;
   protected long lastFlushOffset = 0; // offset when flush was invoked
-  private long initialFileSize = 0; // at time of file open
+  protected long initialFileSize = 0; // at time of file open
   private final short blockReplication; // replication factor of file
   protected boolean shouldSyncBlock = false; // force blocks to disk upon close
   private final EnumSet<AddBlockFlag> addBlockFlags;
@@ -391,14 +391,16 @@ public class DFSOutputStream extends FSOutputSummer
       EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
       HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
       throws IOException {
-    if(stat.getErasureCodingPolicy() != null) {
-      throw new IOException(
-          "Not support appending to a striping layout file yet.");
-    }
     try (TraceScope ignored =
              dfsClient.newPathTraceScope("newStreamForAppend", src)) {
-      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
-          progress, lastBlock, stat, checksum, favoredNodes);
+      DFSOutputStream out;
+      if (stat.isErasureCoded()) {
+        out = new DFSStripedOutputStream(dfsClient, src, flags, progress,
+            lastBlock, stat, checksum, favoredNodes);
+      } else {
+        out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock,
+            stat, checksum, favoredNodes);
+      }
       out.start();
       return out;
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 97310ee..ff81995 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -276,6 +276,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
   private final int numAllBlocks;
   private final int numDataBlocks;
   private ExtendedBlock currentBlockGroup;
+  private ExtendedBlock prevBlockGroup4Append;
   private final String[] favoredNodes;
   private final List<StripedDataStreamer> failedStreamers;
   private final Map<Integer, Integer> corruptBlockCountMap;
@@ -324,6 +325,16 @@ public class DFSStripedOutputStream extends DFSOutputStream
     setCurrentStreamer(0);
   }
 
+  /** Construct a new output stream for appending to a file. */
+  DFSStripedOutputStream(DFSClient dfsClient, String src,
+      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
+      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
+      throws IOException {
+    this(dfsClient, src, stat, flags, progress, checksum, favoredNodes);
+    initialFileSize = stat.getLen(); // length of file when opened
+    prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null;
+  }
+
   private boolean useDirectBuffer() {
     return encoder.preferDirectBuffer();
   }
@@ -473,12 +484,17 @@ public class DFSStripedOutputStream extends 
DFSOutputStream
         + Arrays.asList(excludedNodes));
 
     // replace failed streamers
+    ExtendedBlock prevBlockGroup = currentBlockGroup;
+    if (prevBlockGroup4Append != null) {
+      prevBlockGroup = prevBlockGroup4Append;
+      prevBlockGroup4Append = null;
+    }
     replaceFailedStreamers();
 
     LOG.debug("Allocating new block group. The previous block group: "
-        + currentBlockGroup);
+        + prevBlockGroup);
     final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
-        currentBlockGroup, fileId, favoredNodes, getAddBlockFlags());
+         prevBlockGroup, fileId, favoredNodes, getAddBlockFlags());
     assert lb.isStriped();
     // assign the new block to the current block group
     currentBlockGroup = lb.getBlock();
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
index be272d2..6b9fd8b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -107,12 +107,6 @@ final class FSDirAppendOp {
       }
       final INodeFile file = INodeFile.valueOf(inode, path, true);
 
-      // not support appending file with striped blocks
-      if (file.isStriped()) {
-        throw new UnsupportedOperationException(
-            "Cannot append to files with striped block " + path);
-      }
-
       BlockManager blockManager = fsd.getBlockManager();
       final BlockStoragePolicy lpPolicy = blockManager
           .getStoragePolicy("LAZY_PERSIST");
@@ -192,6 +186,10 @@ final class FSDirAppendOp {
 
     LocatedBlock ret = null;
     if (!newBlock) {
+      if (file.isStriped()) {
+        throw new UnsupportedOperationException(
+            "Append on EC file without new block is not supported.");
+      }
       FSDirectory fsd = fsn.getFSDirectory();
       ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
       if (ret != null && delta != null) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStripedFileAppend.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStripedFileAppend.java
new file mode 100644
index 0000000..b4cf102
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStripedFileAppend.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests append on erasure coded file.
+ */
+public class TestStripedFileAppend {
+  public static final Log LOG = LogFactory.getLog(TestStripedFileAppend.class);
+
+  static {
+    DFSTestUtil.setNameNodeLogLevel(Level.ALL);
+  }
+
+  private static final int NUM_DATA_BLOCKS =
+      StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
+  private static final int CELL_SIZE =
+      StripedFileTestUtil.getDefaultECPolicy().getCellSize();
+  private static final int NUM_DN = 9;
+  private static final int STRIPES_PER_BLOCK = 4;
+  private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+  private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
+  private static final Random RANDOM = new Random();
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+  private Path dir = new Path("/TestFileAppendStriped");
+  private HdfsConfiguration conf = new HdfsConfiguration();
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.setErasureCodingPolicy(dir, null);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * test simple append to a closed striped file, with NEW_BLOCK flag enabled.
+   */
+  @Test
+  public void testAppendToNewBlock() throws IOException {
+    int fileLength = 0;
+    int totalSplit = 6;
+    byte[] expected =
+        StripedFileTestUtil.generateBytes(BLOCK_GROUP_SIZE * totalSplit);
+
+    Path file = new Path(dir, "testAppendToNewBlock");
+    FSDataOutputStream out;
+    for (int split = 0; split < totalSplit; split++) {
+      if (split == 0) {
+        out = dfs.create(file);
+      } else {
+        out = dfs.append(file,
+            EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+      }
+      int splitLength = RANDOM.nextInt(BLOCK_GROUP_SIZE);
+      out.write(expected, fileLength, splitLength);
+      fileLength += splitLength;
+      out.close();
+    }
+    expected = Arrays.copyOf(expected, fileLength);
+    LocatedBlocks lbs =
+        dfs.getClient().getLocatedBlocks(file.toString(), 0L, Long.MAX_VALUE);
+    assertEquals(totalSplit, lbs.getLocatedBlocks().size());
+    StripedFileTestUtil.verifyStatefulRead(dfs, file, fileLength, expected,
+        new byte[4096]);
+    StripedFileTestUtil.verifySeek(dfs, file, fileLength,
+        StripedFileTestUtil.getDefaultECPolicy(), totalSplit);
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to