This is an automated email from the ASF dual-hosted git repository. vinayakumarb pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new f940ab2 HDFS-7663. Erasure Coding: Append on striped file. Contributed by Ayush Saxena. f940ab2 is described below commit f940ab242da80a22bae95509d5c282d7e2f7ecdb Author: Vinayakumar B <vinayakum...@apache.org> AuthorDate: Tue Mar 5 19:26:42 2019 +0530 HDFS-7663. Erasure Coding: Append on striped file. Contributed by Ayush Saxena. --- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 16 +-- .../apache/hadoop/hdfs/DFSStripedOutputStream.java | 20 +++- .../hadoop/hdfs/server/namenode/FSDirAppendOp.java | 10 +- .../apache/hadoop/hdfs/TestStripedFileAppend.java | 114 +++++++++++++++++++++ 4 files changed, 145 insertions(+), 15 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index aaef8ad..a4e0742 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -119,7 +119,7 @@ public class DFSOutputStream extends FSOutputSummer protected int packetSize = 0; // write packet size, not including the header. protected int chunksPerPacket = 0; protected long lastFlushOffset = 0; // offset when flush was invoked - private long initialFileSize = 0; // at time of file open + protected long initialFileSize = 0; // at time of file open private final short blockReplication; // replication factor of file protected boolean shouldSyncBlock = false; // force blocks to disk upon close private final EnumSet<AddBlockFlag> addBlockFlags; @@ -391,14 +391,16 @@ public class DFSOutputStream extends FSOutputSummer EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { - if(stat.getErasureCodingPolicy() != null) { - throw new IOException( - "Not support appending to a striping layout file yet."); - } try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForAppend", src)) { - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, - progress, lastBlock, stat, checksum, favoredNodes); + DFSOutputStream out; + if (stat.isErasureCoded()) { + out = new DFSStripedOutputStream(dfsClient, src, flags, progress, + lastBlock, stat, checksum, favoredNodes); + } else { + out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, + stat, checksum, favoredNodes); + } out.start(); return out; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 97310ee..ff81995 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -276,6 +276,7 @@ public class DFSStripedOutputStream extends DFSOutputStream private final int numAllBlocks; private final int numDataBlocks; private ExtendedBlock currentBlockGroup; + private ExtendedBlock prevBlockGroup4Append; private final String[] favoredNodes; private final List<StripedDataStreamer> failedStreamers; private final Map<Integer, Integer> corruptBlockCountMap; @@ -324,6 +325,16 @@ public class DFSStripedOutputStream extends DFSOutputStream setCurrentStreamer(0); } + /** Construct a new output stream for appending to a file. */ + DFSStripedOutputStream(DFSClient dfsClient, String src, + EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, + HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) + throws IOException { + this(dfsClient, src, stat, flags, progress, checksum, favoredNodes); + initialFileSize = stat.getLen(); // length of file when opened + prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null; + } + private boolean useDirectBuffer() { return encoder.preferDirectBuffer(); } @@ -473,12 +484,17 @@ public class DFSStripedOutputStream extends DFSOutputStream + Arrays.asList(excludedNodes)); // replace failed streamers + ExtendedBlock prevBlockGroup = currentBlockGroup; + if (prevBlockGroup4Append != null) { + prevBlockGroup = prevBlockGroup4Append; + prevBlockGroup4Append = null; + } replaceFailedStreamers(); LOG.debug("Allocating new block group. The previous block group: " - + currentBlockGroup); + + prevBlockGroup); final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src, - currentBlockGroup, fileId, favoredNodes, getAddBlockFlags()); + prevBlockGroup, fileId, favoredNodes, getAddBlockFlags()); assert lb.isStriped(); // assign the new block to the current block group currentBlockGroup = lb.getBlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java index be272d2..6b9fd8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java @@ -107,12 +107,6 @@ final class FSDirAppendOp { } final INodeFile file = INodeFile.valueOf(inode, path, true); - // not support appending file with striped blocks - if (file.isStriped()) { - throw new UnsupportedOperationException( - "Cannot append to files with striped block " + path); - } - BlockManager blockManager = fsd.getBlockManager(); final BlockStoragePolicy lpPolicy = blockManager .getStoragePolicy("LAZY_PERSIST"); @@ -192,6 +186,10 @@ final class FSDirAppendOp { LocatedBlock ret = null; if (!newBlock) { + if (file.isStriped()) { + throw new UnsupportedOperationException( + "Append on EC file without new block is not supported."); + } FSDirectory fsd = fsn.getFSDirectory(); ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0); if (ret != null && delta != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStripedFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStripedFileAppend.java new file mode 100644 index 0000000..b4cf102 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStripedFileAppend.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Tests append on erasure coded file. + */ +public class TestStripedFileAppend { + public static final Log LOG = LogFactory.getLog(TestStripedFileAppend.class); + + static { + DFSTestUtil.setNameNodeLogLevel(Level.ALL); + } + + private static final int NUM_DATA_BLOCKS = + StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits(); + private static final int CELL_SIZE = + StripedFileTestUtil.getDefaultECPolicy().getCellSize(); + private static final int NUM_DN = 9; + private static final int STRIPES_PER_BLOCK = 4; + private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; + private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS; + private static final Random RANDOM = new Random(); + + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + private Path dir = new Path("/TestFileAppendStriped"); + private HdfsConfiguration conf = new HdfsConfiguration(); + + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfs.mkdirs(dir); + dfs.setErasureCodingPolicy(dir, null); + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * test simple append to a closed striped file, with NEW_BLOCK flag enabled. + */ + @Test + public void testAppendToNewBlock() throws IOException { + int fileLength = 0; + int totalSplit = 6; + byte[] expected = + StripedFileTestUtil.generateBytes(BLOCK_GROUP_SIZE * totalSplit); + + Path file = new Path(dir, "testAppendToNewBlock"); + FSDataOutputStream out; + for (int split = 0; split < totalSplit; split++) { + if (split == 0) { + out = dfs.create(file); + } else { + out = dfs.append(file, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + } + int splitLength = RANDOM.nextInt(BLOCK_GROUP_SIZE); + out.write(expected, fileLength, splitLength); + fileLength += splitLength; + out.close(); + } + expected = Arrays.copyOf(expected, fileLength); + LocatedBlocks lbs = + dfs.getClient().getLocatedBlocks(file.toString(), 0L, Long.MAX_VALUE); + assertEquals(totalSplit, lbs.getLocatedBlocks().size()); + StripedFileTestUtil.verifyStatefulRead(dfs, file, fileLength, expected, + new byte[4096]); + StripedFileTestUtil.verifySeek(dfs, file, fileLength, + StripedFileTestUtil.getDefaultECPolicy(), totalSplit); + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org