[
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352095#comment-15352095
]
ASF GitHub Bot commented on APEXMALHAR-2116:
--------------------------------------------
Github user amberarrow commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/326#discussion_r68678627
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem in
+ * parallel (without ordering guarantees between tuples). Records can be
+ * delimited (e.g. newline) or fixed width records. Output tuples are
byte[].
+ *
+ * Typically, this operator will be connected to output of
FileSplitterInput to
+ * read records in parallel.
+ */
[email protected]
+public class FSRecordReader extends FSSliceReader
+{
+ /**
+ * Record reader mode decides how to split the records.
+ */
+ public static enum RECORD_READER_MODE
+ {
+ DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+ }
+
+ /**
+ * Criteria for record split
+ */
+ private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+ /**
+ * Length for fixed width record
+ */
+ private int recordLength;
+
+ /**
+ * Port to emit individual records/tuples as byte[]
+ */
+ public final transient DefaultOutputPort<byte[]> records = new
DefaultOutputPort<byte[]>();
+
+ /**
+ * Initialize appropriate reader context based on mode selection
+ */
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+ ReaderContext.FixedBytesReaderContext<FSDataInputStream>
fixedBytesReaderContext = new
ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
+ fixedBytesReaderContext.setLength(recordLength);
+ readerContext = fixedBytesReaderContext;
+ } else {
+ readerContext = new
ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+ }
+ }
+
+ /**
+ * Read the block data and emit records based on reader context
+ *
+ * @param blockMetadata
+ * block
+ * @throws IOException
+ */
+ protected void readBlock(BlockMetadata blockMetadata) throws IOException
+ {
+ readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+ ReaderContext.Entity entity;
+ while ((entity = readerContext.next()) != null) {
+
+
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+ byte[] record = entity.getRecord();
+
+ if (record != null) {
+ counters.getCounter(ReaderCounterKeys.RECORDS).increment();
+ records.emit(record);
+ }
+ }
+ }
+
+ /**
+ * Criteria for record split
+ *
+ * @param mode
+ * Mode
+ */
+ public void setMode(RECORD_READER_MODE mode)
+ {
+ this.mode = mode;
+ }
+
+ /**
+ * Criteria for record split
+ *
+ * @return mode
+ */
+ public RECORD_READER_MODE getMode()
+ {
+ return mode;
+ }
+
+ /**
+ * Length for fixed width record
+ *
+ * @param recordLength
+ */
+ public void setRecordLength(int recordLength)
+ {
+ this.recordLength = recordLength;
+ }
+
+ /**
+ * Length for fixed width record
+ *
+ * @return record length
+ */
+ public int getRecordLength()
+ {
+ return recordLength;
+ }
+
+ static {
+ /**
+ * Code for enabling BeanUtils to accept comma separated string to
+ * initialize FIELD_TYPE[]
+ */
+ class RecordReaderModeConverter extends AbstractConverter
--- End diff --
Could you provide more detail on why this is necessary and how it is used ?
> File Record reader module
> -------------------------
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Yogi Devendra
> Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor
> directories, read files and emit data records(tuple). Records are based on
> record separator (e.g. newline) or fixed size (no of bytes).
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for
> FIXED_LENGTH, SEPARATOR_BASED recors.
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) +
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port
> signature is different than BlockReader.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)