[
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387621#comment-15387621
]
ASF GitHub Bot commented on APEXMALHAR-2116:
--------------------------------------------
Github user yogidevendra commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/326#discussion_r71698302
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem. Records
can
+ * be read in parallel using multiple partitions of record reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ *
+ * The module reads data in parallel, following parameters can be
configured
+ * <br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files with names matching given regex will be
read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files
in
+ * input directory<br/>
+ * 4. recursive: if true, scan input directories recursively<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequentialFileRead: if true, then each reader partition will read
different file. <br/>
+ * instead of reading different offsets of the same file. <br/>
+ * (File level parallelism instead of block level parallelism)<br/>
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
[email protected]
+public class FSRecordReaderModule implements Module
+{
+ @NotNull
+ @Size(min = 1)
+ private String files;
+ private String filePatternRegularExp;
+ @Min(1)
+ private long scanIntervalMillis = 5000;
+ private boolean recursive = true;
+ private boolean sequentialFileRead = false;
+ @Min(1)
+ private int readersCount = 1;
+ @Min(1)
+ protected int blocksThreshold = 1;
+
+ public final transient ProxyOutputPort<byte[]> records = new
ProxyOutputPort<byte[]>();
+
+ /**
+ * Criteria for record split
+ */
+ private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+ /**
+ * Length for fixed width record
+ */
+ @Min(1)
+ private int recordLength;
--- End diff --
@amberarrow As per your suggestion, removed @ min annotation for record
length.
Added validation in the code to ensure that the value is positive if
FIXED_WIDTH_RECORD.
Also, added test case for validating this.
Could you have a look and merge this if it looks OK.
> File Record reader module
> -------------------------
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Yogi Devendra
> Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor
> directories, read files and emit data records(tuple). Records are based on
> record separator (e.g. newline) or fixed size (no of bytes).
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for
> FIXED_LENGTH, SEPARATOR_BASED recors.
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) +
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port
> signature is different than BlockReader.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)