[
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356900#comment-15356900
]
ASF GitHub Bot commented on APEXMALHAR-2116:
--------------------------------------------
Github user yogidevendra commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/326#discussion_r69111489
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
---
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ *
+ * The module reads data in parallel, following parameters can be
configured
+ * <br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files names matching given regex will be
read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files
in
+ * input directory<br/>
+ * 4. recursive: if scan recursively input directories<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequentialFileRead: If emit file blocks in sequence?<br/>
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
[email protected]
+public class FSRecordReaderModule implements Module
+{
+ @NotNull
+ @Size(min = 1)
+ private String files;
+ private String filePatternRegularExp;
+ @Min(0)
+ private long scanIntervalMillis;
+ private boolean recursive = true;
+ private boolean sequentialFileRead = false;
+ private int readersCount;
+ @Min(1)
+ protected int blocksThreshold;
+
+ public final transient ProxyOutputPort<byte[]> records = new
ProxyOutputPort<byte[]>();
+
+ /**
+ * Criteria for record split
+ */
+ private RECORD_READER_MODE mode;
+
+ /**
+ * Length for fixed width record
+ */
+ private int recordLength;
+
+ public FileSplitterInput createFileSplitter()
+ {
+ return new FileSplitterInput();
+ }
+
+ public FSRecordReader createBlockReader()
+ {
+ FSRecordReader recordReader = new FSRecordReader();
+ recordReader.setMode(mode);
+ recordReader.setRecordLength(recordLength);
+
+ return recordReader;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ FileSplitterInput fileSplitter = dag.addOperator("FileSplitter",
createFileSplitter());
+ FSRecordReader recordReader = dag.addOperator("BlockReader",
createBlockReader());
+
+ dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput,
recordReader.blocksMetadataInput);
+
+ if (sequentialFileRead) {
+ dag.setInputPortAttribute(recordReader.blocksMetadataInput,
Context.PortContext.STREAM_CODEC,
+ new SequentialFileBlockMetadataCodec());
+ }
+
+ FileSplitterInput.TimeBasedDirectoryScanner fileScanner =
fileSplitter.getScanner();
+ fileScanner.setFiles(files);
+ if (scanIntervalMillis != 0) {
+ fileScanner.setScanIntervalMillis(scanIntervalMillis);
+ }
+ fileScanner.setRecursive(recursive);
+ if (filePatternRegularExp != null) {
+
fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
+ }
+
+ recordReader.setBasePath(files);
+ if (readersCount != 0) {
+ dag.setAttribute(recordReader, Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner<FSSliceReader>(readersCount));
+ }
+ fileSplitter.setBlocksThreshold(blocksThreshold);
+ records.set(recordReader.records);
+ }
+
+ /**
+ * A comma separated list of directories to scan. If the path is not
fully
+ * qualified the default file system is used. A fully qualified path can
be
+ * provided to scan directories in other filesystems.
+ *
+ * @param files
+ * files
+ */
+ public void setFiles(String files)
+ {
+ this.files = files;
--- End diff --
I tried to keep it consistent with FSInputModule.java. If you feel that
readability/intuitive names are more important than having consistency across
operators then I am fine with changing this field name.
> File Record reader module
> -------------------------
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Yogi Devendra
> Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor
> directories, read files and emit data records(tuple). Records are based on
> record separator (e.g. newline) or fixed size (no of bytes).
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for
> FIXED_LENGTH, SEPARATOR_BASED recors.
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) +
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port
> signature is different than BlockReader.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)