HI All,

We have created tutorials for FileSplitter and BlockReader here:
https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md
https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md

Please have a look. Any feedback is appreciated.

Thanks,
Chandni

On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <[email protected]>
wrote:

> Hi Vk,
>
> Please find  a CSV block reader here and let me know if you have
> questions. I have also added a test and it seems to be working fine.
>
>
> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader
>
> Please note that the BlockReader api has changed from the one you have
> been using considerably.
>
> Thanks,
> Chandni
>
> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <[email protected]>
> wrote:
>
>> Hi vk,
>>
>> I think you don't need to override readBlock() in AbstractBlockReader.
>>
>> A simpler way to do this will be using ReadAheadLineReaderContext as the
>> readerContext and providing implementation of converting bytes to the CSV
>> bean.
>>
>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
>> {
>>   public CSVBeanReader()
>>   {
>>     this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
>>   }
>>
>>   @Override protected CSVBean convertToRecord(byte[] bytes)
>>   {
>>     //TODO: convert bytes to bean
>>     return new CSVBean(bytes);
>>   }
>> }
>>
>> Are you using supercsv? I think there is a way to convert bytes to a CSV
>> Record using it and I may have that example somewhere which I will look up
>> and let you know.
>>
>> Chandni
>>
>>
>>
>> On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]>
>> wrote:
>>
>>>  Here is a detailed description of the problem.
>>>
>>>>
>>>> My file size : *7,590,177  bytes*
>>>>
>>>> FIle splitter block size config :
>>>>
>>>>
>>>>  
>>>> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>>>  <value>*16806*</value>
>>>>
>>>>
>>>> *MyBlockReader Implementation:*
>>>>
>>>> @Override
>>>> protected void readBlock(BlockMetadata blockMetadata) throws
>>>> IOException {
>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>>>> ReaderContext.Entity entity;
>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>>>> encoding),csvPreference);
>>>> while ((entity = readerContext.next()) != null) {
>>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>>>> Packages record = convertToRecord(entity.getRecord());
>>>> //if record is partial, ignore the record
>>>> if (record != null) {
>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>>>> data.emit(record);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> @Override
>>>> protected Packages convertToRecord(byte[] data) {
>>>> Packages bean = null;
>>>> try {
>>>> bean = csvReader.read(Packages.class,Packages.COLS);
>>>> } catch (IOException e) {
>>>> e.printStackTrace();
>>>> }
>>>> return bean;
>>>> }
>>>>
>>>>
>>>> Based on the above, when blocks are created a record might be split
>>>> into two different blocks. When reading the blocks and converting them to
>>>> beans, it has to set the offset values appropriately to merge the split
>>>> record into one and process it. It looks like this implementation
>>>> is already handled in the API when *readerContext.initialize(stream,
>>>> blockMetadata, consecutiveBlock)* is called, but when tried to execute
>>>> with the above snippet, the following error is thrown because of the split
>>>> record. Can you please suggest?
>>>>
>>>> *Exception:*
>>>>
>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>>>> engine.StreamingContainer run - Operator set
>>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>>>> 0,
>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>>>> stopped running due to an exception.
>>>> *java.lang.IllegalArgumentException: the nameMapping array and the
>>>> number of columns read should be the same size (nameMapping length = 24,
>>>> columns = 5)*
>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>>>> at
>>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>>>> at
>>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>>>> at
>>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>>>> at
>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>>>> at
>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>>>
>>>>
>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>>>
>>>>
>>>> Routing to dev@apex
>>>>
>>>> Amol
>>>>
>>>>
>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote:
>>>>
>>>>> Hi Pramod,
>>>>>
>>>>> thanks for the reply, it is working..
>>>>>
>>>>> And i have one more query on it, How to decide the block size?
>>>>>
>>>>> as per my understanding the
>>>>>
>>>>> noofBlocks=filesize / blocksize
>>>>>
>>>>> By this some records may be split into two blocks, when converting the
>>>>> record we dont have the complete data in one block.
>>>>>
>>>>> how to handle this?
>>>>>
>>>>> thanks in adavance.
>>>>>
>>>>> Thanks -Chiru
>>>>>
>>>>>
>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I would like to read a large file using filesplitter and emit
>>>>>> tuples.So i have writtent the code like below.
>>>>>>
>>>>>>
>>>>>> public class Reader extends
>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>>>
>>>>>> @Override
>>>>>> protected Data convertToRecord(byte[] data)  { ///
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> In my application class  i have created the object for filesplitter
>>>>>> and Reader classes and connect through stream.
>>>>>>
>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>>>> reader.blocksMetadataInput)
>>>>>>
>>>>>>
>>>>>> In properties file iam passing the directory path
>>>>>> like 
>>>>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>>>
>>>>>> when i run the application iam getting the below error:
>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>>>> 0,
>>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>>>> due to setup failure.*
>>>>>> *java.lang.IllegalArgumentException: empty files*
>>>>>>
>>>>>>
>>>>>> Please suggest is my approach is correct or not?
>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>>>> share any sample code line.
>>>>>>
>>>>>> thanks _Chiranjeevi
>>>>>>
>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Malhar" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/malhar-users.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>
>

Reply via email to