Repository: apex-malhar Updated Branches: refs/heads/master b5c003c94 -> 09a65c2f9
APEXMALHAR-2484 Support of PartFileWriter for writing the part files Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/09a65c2f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/09a65c2f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/09a65c2f Branch: refs/heads/master Commit: 09a65c2f9f9743cca79bb0b086bab89ca827401e Parents: b5c003c Author: chaitanya <[email protected]> Authored: Wed May 3 14:54:03 2017 +0530 Committer: Ilya Ganelin <[email protected]> Committed: Fri May 5 15:08:35 2017 -0700 ---------------------------------------------------------------------- .../malhar/lib/io/block/PartFileWriter.java | 146 ++++++++++++++++ .../malhar/lib/io/block/PartFileWriterTest.java | 171 +++++++++++++++++++ 2 files changed, 317 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/09a65c2f/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java b/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java new file mode 100644 index 0000000..010e8ff --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.io.block; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.tuple.MutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.io.block.AbstractBlockReader; +import com.datatorrent.lib.io.block.BlockWriter; +import com.datatorrent.lib.io.fs.AbstractFileSplitter; +import com.datatorrent.netlet.util.Slice; + +/** + * Writes the blocks into the specified directory. + * If f1 is the file of size 10 MB and the block size is 1 MB then this operator writes the blocks into the + * specified directory as f1.part1, f1.part2 , ...., f1.part10. Here, size of each part is 1 MB. + */ +public class PartFileWriter extends BlockWriter implements Operator.IdleTimeHandler +{ + protected static String PARTSUFFIX = ".part"; + @NotNull + private String outputDirectoryPath; + private Map<Long, MutablePair<Integer, String>> blockInfo = new HashMap<>(); + private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples; + + public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> fileMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>() + { + @Override + public void process(AbstractFileSplitter.FileMetadata fileMetadata) + { + blockInfo.clear(); + long[] blocks = fileMetadata.getBlockIds(); + String relativePath = fileMetadata.getRelativePath(); + for (int i = 0; i < blocks.length; i++) { + blockInfo.put(blocks[i], new MutablePair<>(i + 1, relativePath)); + } + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + filePath = outputDirectoryPath; + waitingTuples = new LinkedList<>(); + } + + @Override + protected void processTuple(AbstractBlockReader.ReaderRecord<Slice> tuple) + { + // Check whether the fileMetadata of this blockid is received from fileMetadataInput port. If not, put it in waitingTuples. + if (blockInfo.get(tuple.getBlockId()) == null) { + waitingTuples.add(tuple); + return; + } + super.processTuple(tuple); + } + + @Override + protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple) + { + MutablePair<Integer,String> blockId = blockInfo.get(tuple.getBlockId()); + return blockId.getRight() + PARTSUFFIX + blockId.getLeft(); + } + + @Override + public void endWindow() + { + processWaitBlocks(); + waitingTuples.clear(); + super.endWindow(); + } + + @Override + public void finalizeFile(String fileName) throws IOException + { + MutablePair<Integer,String> blockId = blockInfo.get(Long.parseLong(fileName)); + super.finalizeFile(blockId.getRight() + PARTSUFFIX + blockId.getLeft()); + } + + @Override + public void handleIdleTime() + { + processWaitBlocks(); + } + + /** + * Process the blocks which are in wait state. + */ + protected void processWaitBlocks() + { + Iterator<AbstractBlockReader.ReaderRecord<Slice>> waitIterator = waitingTuples.iterator(); + while (waitIterator.hasNext()) { + AbstractBlockReader.ReaderRecord<Slice> blockData = waitIterator.next(); + if (blockInfo.get(blockData.getBlockId()) != null) { + super.processTuple(blockData); + waitIterator.remove(); + } + } + } + + /** + * Return the path of output directory for storing part files + * @return outputDirectoryPath + */ + public String getOutputDirectoryPath() + { + return outputDirectoryPath; + } + + /** + * Specify the path of output directory for storing part files + * @param outputDirectoryPath given outputDirectoryPath + */ + public void setOutputDirectoryPath(String outputDirectoryPath) + { + this.outputDirectoryPath = outputDirectoryPath; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/09a65c2f/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java b/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java new file mode 100644 index 0000000..784ab39 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.io.block; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.io.block.AbstractBlockReader; +import com.datatorrent.lib.io.fs.AbstractFileSplitter; +import com.datatorrent.netlet.util.Slice; + +import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; + +public class PartFileWriterTest +{ + + public static final int BLOCK_SIZE = 5; + public static String FILE_NAME = "FILE"; + + public static final String[] FILE_CONTENTS = {"abcdefgh", "pqrst", "xyz", "ABCDEFGHIJKLMNOPQRSTUVWXYZ", + "0123456789" }; + + private class TestMeta extends TestWatcher + { + String outputPath; + //Maintain the sequence of records and process one by one through the input port of PartFileWriter. + List<List<AbstractBlockReader.ReaderRecord<Slice>>> blockDataList = new ArrayList<>(FILE_CONTENTS.length); + Map<Long, String> blockIdToExpectedContent = Maps.newHashMap(); + + PartFileWriter underTest; + File blocksDir; + Context.OperatorContext context; + List<AbstractFileSplitter.FileMetadata> fileMetadatas = new ArrayList<>(); + + /* (non-Javadoc) + * @see org.junit.rules.TestWatcher#starting(org.junit.runner.Description) + */ + @Override + protected void starting(Description description) + { + super.starting(description); + outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath(); + + underTest = new PartFileWriter(); + underTest.setOutputDirectoryPath(outputPath); + + try { + File outDir = new File(outputPath); + FileUtils.forceMkdir(outDir); + + blocksDir = new File(outputPath); + blocksDir.mkdirs(); + + populateBlocks(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected void populateBlocks() + { + long blockId = 1000; + for (int i = 0; i < FILE_CONTENTS.length; i++) { + int blockIndex = 0; + List<AbstractBlockReader.ReaderRecord<Slice>> blockList = new ArrayList<>(); + AbstractFileSplitter.FileMetadata fileMetadata = new AbstractFileSplitter.FileMetadata(outputPath); + fileMetadata.setRelativePath(FILE_NAME + i); + String fileContents = FILE_CONTENTS[i]; + int fileLength = fileContents.length(); + int noOfBlocks = ((fileLength / BLOCK_SIZE) + (((fileLength % BLOCK_SIZE) == 0) ? 0 : 1)); + long[] blockIds = new long[noOfBlocks]; + for (int offset = 0; offset < fileLength; offset += BLOCK_SIZE, blockId++) { + String blockContents; + if (offset + BLOCK_SIZE < fileLength) { + blockContents = fileContents.substring(offset, offset + BLOCK_SIZE); + } else { + blockContents = fileContents.substring(offset); + } + + AbstractBlockReader.ReaderRecord<Slice> readerRecord = new AbstractBlockReader.ReaderRecord<Slice>(blockId, new Slice(blockContents.getBytes())); + blockIds[blockIndex] = blockId; + blockIndex++; + blockIdToExpectedContent.put(blockId, blockContents); + blockList.add(readerRecord); + } + blockDataList.add(blockList); + fileMetadata.setBlockIds(blockIds); + fileMetadata.setNumberOfBlocks(noOfBlocks); + fileMetadatas.add(fileMetadata); + } + } + + /* (non-Javadoc) + * @see org.junit.rules.TestWatcher#finished(org.junit.runner.Description) + */ + @Override + protected void finished(Description description) + { + super.finished(description); + + try { + FileUtils.deleteDirectory(new File(outputPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public PartFileWriterTest.TestMeta testMeta = new PartFileWriterTest.TestMeta(); + + @Test + public void testBlockWriting() throws IOException + { + Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.DAGContext.APPLICATION_ID, "PartitionWriterTest"); + attributes.put(DAG.DAGContext.APPLICATION_PATH, testMeta.outputPath); + testMeta.context = mockOperatorContext(1, attributes); + + testMeta.underTest.setup(testMeta.context); + for (int fileIndex = 0; fileIndex < FILE_CONTENTS.length; fileIndex++) { + testMeta.underTest.beginWindow(fileIndex); + testMeta.underTest.fileMetadataInput.process(testMeta.fileMetadatas.get(fileIndex)); + for (int blockIndex = 0; blockIndex < testMeta.fileMetadatas.get(fileIndex).getNumberOfBlocks(); blockIndex++) { + testMeta.underTest.input.process(testMeta.blockDataList.get(fileIndex).get(blockIndex)); + } + testMeta.underTest.endWindow(); + } + testMeta.underTest.committed(2); + testMeta.underTest.teardown(); + File[] blockFileNames = testMeta.blocksDir.listFiles(); + for (File blockFile : blockFileNames) { + int fileIndex = Integer.parseInt(blockFile.getName().split("\\.")[0].replaceAll(FILE_NAME, "")); + //Ignore the PartFileWriter.PARTSUFFIX and trailing "." + int blockIndex = Integer.parseInt(blockFile.getName().split("\\.")[1].replaceAll(PartFileWriter.PARTSUFFIX.substring(1), "")); + String expected = testMeta.blockIdToExpectedContent.get(testMeta.fileMetadatas.get(fileIndex).getBlockIds()[blockIndex - 1]); + Assert.assertEquals(expected, FileUtils.readFileToString(blockFile)); + } + } +}
