Folks,

You can comment on these pull requests:
https://github.com/DataTorrent/docs/pull/2
https://github.com/DataTorrent/docs/pull/3

Chandni


On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh <[email protected]>
wrote:

> HI All,
>
> We have created tutorials for FileSplitter and BlockReader here:
>
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md
>
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md
>
> Please have a look. Any feedback is appreciated.
>
> Thanks,
> Chandni
>
> On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <[email protected]>
> wrote:
>
>> Hi Vk,
>>
>> Please find  a CSV block reader here and let me know if you have
>> questions. I have also added a test and it seems to be working fine.
>>
>>
>> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader
>>
>> Please note that the BlockReader api has changed from the one you have
>> been using considerably.
>>
>> Thanks,
>> Chandni
>>
>> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <[email protected]>
>> wrote:
>>
>>> Hi vk,
>>>
>>> I think you don't need to override readBlock() in AbstractBlockReader.
>>>
>>> A simpler way to do this will be using ReadAheadLineReaderContext as the
>>> readerContext and providing implementation of converting bytes to the CSV
>>> bean.
>>>
>>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
>>> {
>>>   public CSVBeanReader()
>>>   {
>>>     this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
>>>   }
>>>
>>>   @Override protected CSVBean convertToRecord(byte[] bytes)
>>>   {
>>>     //TODO: convert bytes to bean
>>>     return new CSVBean(bytes);
>>>   }
>>> }
>>>
>>> Are you using supercsv? I think there is a way to convert bytes to a CSV
>>> Record using it and I may have that example somewhere which I will look up
>>> and let you know.
>>>
>>> Chandni
>>>
>>>
>>>
>>> On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]>
>>> wrote:
>>>
>>>>  Here is a detailed description of the problem.
>>>>
>>>>>
>>>>> My file size : *7,590,177  bytes*
>>>>>
>>>>> FIle splitter block size config :
>>>>>
>>>>>
>>>>>  
>>>>> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>>>>  <value>*16806*</value>
>>>>>
>>>>>
>>>>> *MyBlockReader Implementation:*
>>>>>
>>>>> @Override
>>>>> protected void readBlock(BlockMetadata blockMetadata) throws
>>>>> IOException {
>>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>>>>> ReaderContext.Entity entity;
>>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>>>>> encoding),csvPreference);
>>>>> while ((entity = readerContext.next()) != null) {
>>>>>
>>>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>>>>> Packages record = convertToRecord(entity.getRecord());
>>>>> //if record is partial, ignore the record
>>>>> if (record != null) {
>>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>>>>> data.emit(record);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> @Override
>>>>> protected Packages convertToRecord(byte[] data) {
>>>>> Packages bean = null;
>>>>> try {
>>>>> bean = csvReader.read(Packages.class,Packages.COLS);
>>>>> } catch (IOException e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> return bean;
>>>>> }
>>>>>
>>>>>
>>>>> Based on the above, when blocks are created a record might be split
>>>>> into two different blocks. When reading the blocks and converting them to
>>>>> beans, it has to set the offset values appropriately to merge the split
>>>>> record into one and process it. It looks like this implementation
>>>>> is already handled in the API when *readerContext.initialize(stream,
>>>>> blockMetadata, consecutiveBlock)* is called, but when tried to
>>>>> execute with the above snippet, the following error is thrown because of
>>>>> the split record. Can you please suggest?
>>>>>
>>>>> *Exception:*
>>>>>
>>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>>>>> engine.StreamingContainer run - Operator set
>>>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>>>>> 0,
>>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>>>>> stopped running due to an exception.
>>>>> *java.lang.IllegalArgumentException: the nameMapping array and the
>>>>> number of columns read should be the same size (nameMapping length = 24,
>>>>> columns = 5)*
>>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>>>>> at
>>>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>>>>> at
>>>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>>>>> at
>>>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>>>>> at
>>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>>>>> at
>>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>>>>
>>>>>
>>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>>>>
>>>>>
>>>>> Routing to dev@apex
>>>>>
>>>>> Amol
>>>>>
>>>>>
>>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote:
>>>>>
>>>>>> Hi Pramod,
>>>>>>
>>>>>> thanks for the reply, it is working..
>>>>>>
>>>>>> And i have one more query on it, How to decide the block size?
>>>>>>
>>>>>> as per my understanding the
>>>>>>
>>>>>> noofBlocks=filesize / blocksize
>>>>>>
>>>>>> By this some records may be split into two blocks, when converting
>>>>>> the record we dont have the complete data in one block.
>>>>>>
>>>>>> how to handle this?
>>>>>>
>>>>>> thanks in adavance.
>>>>>>
>>>>>> Thanks -Chiru
>>>>>>
>>>>>>
>>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I would like to read a large file using filesplitter and emit
>>>>>>> tuples.So i have writtent the code like below.
>>>>>>>
>>>>>>>
>>>>>>> public class Reader extends
>>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>>>>
>>>>>>> @Override
>>>>>>> protected Data convertToRecord(byte[] data)  { ///
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> In my application class  i have created the object for filesplitter
>>>>>>> and Reader classes and connect through stream.
>>>>>>>
>>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>>>>> reader.blocksMetadataInput)
>>>>>>>
>>>>>>>
>>>>>>> In properties file iam passing the directory path
>>>>>>> like 
>>>>>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>>>>
>>>>>>> when i run the application iam getting the below error:
>>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>>>>> 0,
>>>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>>>>> due to setup failure.*
>>>>>>> *java.lang.IllegalArgumentException: empty files*
>>>>>>>
>>>>>>>
>>>>>>> Please suggest is my approach is correct or not?
>>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>>>>> share any sample code line.
>>>>>>>
>>>>>>> thanks _Chiranjeevi
>>>>>>>
>>>>>> --
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Malhar" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to [email protected].
>>>>>> To post to this group, send email to [email protected].
>>>>>> Visit this group at http://groups.google.com/group/malhar-users.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
>>>
>>
>

Reply via email to