Repository: apex-malhar Updated Branches: refs/heads/master 8bfaf7896 -> 2e2dfc5c0
APEXMALHAR-2116 : Added FS record reader operator, module, test 2. javadoc improvements. 3. Adding default values 4. Incorporating review comments. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/02d657c2 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/02d657c2 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/02d657c2 Branch: refs/heads/master Commit: 02d657c215bb5f1c285e9bf2a1e67fdd285eb662 Parents: aaa4464 Author: yogidevendra <[email protected]> Authored: Mon Jun 20 11:17:08 2016 +0530 Committer: yogidevendra <[email protected]> Committed: Mon Jul 25 11:52:45 2016 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/fs/FSRecordReader.java | 149 ++++++++ .../malhar/lib/fs/FSRecordReaderModule.java | 347 +++++++++++++++++++ .../apex/malhar/lib/fs/FSRecordReaderTest.java | 217 ++++++++++++ 3 files changed, 713 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02d657c2/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java new file mode 100644 index 0000000..44168c8 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem in + * parallel (without ordering guarantees between tuples). Records can be + * delimited (e.g. newline) or fixed width records. Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput to + * read records in parallel. + */ [email protected] +public class FSRecordReader extends FSSliceReader +{ + /** + * Record reader mode decides how to split the records. + */ + public static enum RECORD_READER_MODE + { + DELIMITED_RECORD, FIXED_WIDTH_RECORD; + } + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Port to emit individual records/tuples as byte[] + */ + public final transient DefaultOutputPort<byte[]> records = new DefaultOutputPort<byte[]>(); + + /** + * Initialize appropriate reader context based on mode selection + */ + @Override + public void setup(OperatorContext context) + { + super.setup(context); + if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { + ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>(); + fixedBytesReaderContext.setLength(recordLength); + readerContext = fixedBytesReaderContext; + } else { + readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>(); + } + } + + /** + * Read the block data and emit records based on reader context + * + * @param blockMetadata + * block + * @throws IOException + */ + protected void readBlock(BlockMetadata blockMetadata) throws IOException + { + readerContext.initialize(stream, blockMetadata, consecutiveBlock); + ReaderContext.Entity entity; + while ((entity = readerContext.next()) != null) { + + counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); + + byte[] record = entity.getRecord(); + + if (record != null) { + counters.getCounter(ReaderCounterKeys.RECORDS).increment(); + records.emit(record); + } + } + } + + /** + * Criteria for record split + * + * @param mode + * Mode + */ + public void setMode(RECORD_READER_MODE mode) + { + this.mode = mode; + } + + /** + * Criteria for record split + * + * @return mode + */ + public RECORD_READER_MODE getMode() + { + return mode; + } + + /** + * Length for fixed width record + * + * @param recordLength + */ + public void setRecordLength(int recordLength) + { + if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD && recordLength <= 0) { + throw new IllegalArgumentException("recordLength should be greater than 0."); + } + this.recordLength = recordLength; + } + + /** + * Length for fixed width record + * + * @return record length + */ + public int getRecordLength() + { + return recordLength; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02d657c2/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java new file mode 100644 index 0000000..b1df744 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. Records can + * be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * <br/> + * 1. files: list of file(s)/directories to read<br/> + * 2. filePatternRegularExp: Files with names matching given regex will be read + * <br/> + * 3. scanIntervalMillis: interval between two scans to discover new files in + * input directory<br/> + * 4. recursive: if true, scan input directories recursively<br/> + * 5. blockSize: block size used to read input blocks of file<br/> + * 6. readersCount: count of readers to read input file<br/> + * 7. sequentialFileRead: if true, then each reader partition will read + * different file. <br/> + * instead of reading different offsets of the same file. <br/> + * (File level parallelism instead of block level parallelism)<br/> + * 8. blocksThreshold: number of blocks emitted per window + */ [email protected] +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(1) + private long scanIntervalMillis = 5000; + private boolean recursive = true; + private boolean sequentialFileRead = false; + @Min(1) + private int readersCount = 1; + @Min(1) + protected int blocksThreshold = 1; + + public final transient ProxyOutputPort<byte[]> records = new ProxyOutputPort<byte[]>(); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Creates an instance of FileSplitter + * + * @return + */ + public FileSplitterInput createFileSplitter() + { + return new FileSplitterInput(); + } + + /** + * Creates an instance of Record Reader + * + * @return FSRecordReader instance + */ + public FSRecordReader createRecordReader() + { + FSRecordReader recordReader = new FSRecordReader(); + recordReader.setMode(mode); + recordReader.setRecordLength(recordLength); + + return recordReader; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); + FSRecordReader recordReader = dag.addOperator("BlockReader", createRecordReader()); + + dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); + + if (sequentialFileRead) { + dag.setInputPortAttribute(recordReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, + new SequentialFileBlockMetadataCodec()); + } + + FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner(); + fileScanner.setFiles(files); + if (scanIntervalMillis != 0) { + fileScanner.setScanIntervalMillis(scanIntervalMillis); + } + fileScanner.setRecursive(recursive); + if (filePatternRegularExp != null) { + fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp); + } + + recordReader.setBasePath(files); + if (readersCount != 0) { + dag.setAttribute(recordReader, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner<FSSliceReader>(readersCount)); + } + fileSplitter.setBlocksThreshold(blocksThreshold); + records.set(recordReader.records); + } + + /** + * A comma separated list of directories to scan. If the path is not fully + * qualified the default file system is used. A fully qualified path can be + * provided to scan directories in other filesystems. + * + * @param files + * files + */ + public void setFiles(String files) + { + this.files = files; + } + + /** + * Gets the files to be scanned. + * + * @return files to be scanned. + */ + public String getFiles() + { + return files; + } + + /** + * Gets the regular expression for file names to split + * + * @return regular expression + */ + public String getFilePatternRegularExp() + { + return filePatternRegularExp; + } + + /** + * Only files width names matching the given java regular expression are split + * + * @param filePatternRegexp + * regular expression + */ + public void setFilePatternRegularExp(String filePatternRegexp) + { + this.filePatternRegularExp = filePatternRegexp; + } + + /** + * Gets scan interval in milliseconds, interval between two scans to discover + * new files in input directory + * + * @return scanInterval milliseconds + */ + public long getScanIntervalMillis() + { + return scanIntervalMillis; + } + + /** + * Sets scan interval in milliseconds, interval between two scans to discover + * new files in input directory + * + * @param scanIntervalMillis + */ + public void setScanIntervalMillis(long scanIntervalMillis) + { + this.scanIntervalMillis = scanIntervalMillis; + } + + /** + * Get is scan recursive + * + * @return isRecursive + */ + public boolean isRecursive() + { + return recursive; + } + + /** + * set is scan recursive + * + * @param recursive + */ + public void setRecursive(boolean recursive) + { + this.recursive = recursive; + } + + /** + * Gets readers count + * + * @return readersCount + */ + public int getReadersCount() + { + return readersCount; + } + + /** + * Static count of readers to read input file + * + * @param readersCount + */ + public void setReadersCount(int readersCount) + { + this.readersCount = readersCount; + } + + /** + * Gets is sequential file read + * + * @return sequentialFileRead + */ + public boolean isSequentialFileRead() + { + return sequentialFileRead; + } + + /** + * Sets is sequential file read + * + * @param sequentialFileRead + */ + + public void setSequentialFileRead(boolean sequentialFileRead) + { + this.sequentialFileRead = sequentialFileRead; + } + + /** + * Sets number of blocks to be emitted per window.<br/> + * A lot of blocks emitted per window can overwhelm the downstream operators. + * Set this value considering blockSize and readersCount. + * + * @param threshold + */ + public void setBlocksThreshold(int threshold) + { + this.blocksThreshold = threshold; + } + + /** + * Gets number of blocks to be emitted per window.<br/> + * A lot of blocks emitted per window can overwhelm the downstream operators. + * Set this value considering blockSize and readersCount. + * + * @return + */ + public int getBlocksThreshold() + { + return blocksThreshold; + } + + /** + * Criteria for record split + * + * @return mode + */ + public RECORD_READER_MODE getMode() + { + return mode; + } + + /** + * Criteria for record split + * + * @param mode + * Mode + */ + public void setMode(RECORD_READER_MODE mode) + { + this.mode = mode; + } + + /** + * Length for fixed width record + * + * @return record length + */ + public int getRecordLength() + { + return recordLength; + } + + /** + * Length for fixed width record + * + * @param recordLength + */ + public void setRecordLength(int recordLength) + { + this.recordLength = recordLength; + } + + public static class SequentialFileBlockMetadataCodec + extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata> + { + @Override + public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata) + { + return fileBlockMetadata.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/02d657c2/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java new file mode 100644 index 0000000..fdd888c --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; + +public class FSRecordReaderTest +{ + private String inputDir; + static String outputDir; + private static final String FILE_1 = "file1.txt"; + private static final String FILE_2 = "file2.txt"; + private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop"; + private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n"; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + } + + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() throws Exception + { + inputDir = testMeta.baseDirectory + File.separator + "input"; + + FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA); + FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA); + } + + @Test + public void testDelimitedRecords() throws Exception + { + + DelimitedApplication app = new DelimitedApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir); + conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1"); + conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + public static class DelimitedValidator extends BaseOperator + { + Set<String> records = new HashSet<String>(); + + public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>() + { + + @Override + public void process(byte[] tuple) + { + String record = new String(tuple); + records.add(record); + } + }; + + public void teardown() + { + Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n"))); + expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n"))); + + Assert.assertEquals(expectedRecords, records); + } + } + + private static class DelimitedApplication implements StreamingApplication + { + + public void populateDAG(DAG dag, Configuration conf) + { + FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class); + recordReader.setMode(RECORD_READER_MODE.DELIMITED_RECORD); + DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator()); + dag.addStream("records", recordReader.records, validator.data); + } + + } + + @Test + public void testFixedWidthRecords() throws Exception + { + + FixedWidthApplication app = new FixedWidthApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir); + conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8"); + conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1"); + conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + @Test(expected = IllegalArgumentException.class) + public void testMissingRecordLength() throws Exception + { + FixedWidthApplication app = new FixedWidthApplication(); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir); + //Should give IllegalArgumentException since recordLength is not set + //conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8"); + conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1"); + conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000"); + + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(true); + lc.runAsync(); + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000 * 1); + lc.shutdown(); + } + + public static class FixedWidthValidator extends BaseOperator + { + Set<String> records = new HashSet<String>(); + + public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>() + { + + @Override + public void process(byte[] tuple) + { + String record = new String(tuple); + records.add(record); + } + }; + + public void teardown() + { + String[] expected = {"1234\n567", "890\nabcd", "e\nfgh\ni\n", "jklmop", "qr\nstuvw", "\nxyz\n" }; + + Set<String> expectedRecords = new HashSet<String>(Arrays.asList(expected)); + + Assert.assertEquals(expectedRecords, records); + } + } + + private static class FixedWidthApplication implements StreamingApplication + { + + public void populateDAG(DAG dag, Configuration conf) + { + FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class); + recordReader.setMode(RECORD_READER_MODE.FIXED_WIDTH_RECORD); + FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator()); + dag.addStream("records", recordReader.records, validator.data); + } + + } + + private static Logger LOG = LoggerFactory.getLogger(FSRecordReaderTest.class); + +}
