HI All, We have created tutorials for FileSplitter and BlockReader here: https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md
Please have a look. Any feedback is appreciated. Thanks, Chandni On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <[email protected]> wrote: > Hi Vk, > > Please find a CSV block reader here and let me know if you have > questions. I have also added a test and it seems to be working fine. > > > https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader > > Please note that the BlockReader api has changed from the one you have > been using considerably. > > Thanks, > Chandni > > On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <[email protected]> > wrote: > >> Hi vk, >> >> I think you don't need to override readBlock() in AbstractBlockReader. >> >> A simpler way to do this will be using ReadAheadLineReaderContext as the >> readerContext and providing implementation of converting bytes to the CSV >> bean. >> >> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean> >> { >> public CSVBeanReader() >> { >> this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>(); >> } >> >> @Override protected CSVBean convertToRecord(byte[] bytes) >> { >> //TODO: convert bytes to bean >> return new CSVBean(bytes); >> } >> } >> >> Are you using supercsv? I think there is a way to convert bytes to a CSV >> Record using it and I may have that example somewhere which I will look up >> and let you know. >> >> Chandni >> >> >> >> On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]> >> wrote: >> >>> Here is a detailed description of the problem. >>> >>>> >>>> My file size : *7,590,177 bytes* >>>> >>>> FIle splitter block size config : >>>> >>>> >>>> >>>> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name> >>>> <value>*16806*</value> >>>> >>>> >>>> *MyBlockReader Implementation:* >>>> >>>> @Override >>>> protected void readBlock(BlockMetadata blockMetadata) throws >>>> IOException { >>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock); >>>> ReaderContext.Entity entity; >>>> csvReader = new CsvBeanReader(new InputStreamReader(stream, >>>> encoding),csvPreference); >>>> while ((entity = readerContext.next()) != null) { >>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); >>>> Packages record = convertToRecord(entity.getRecord()); >>>> //if record is partial, ignore the record >>>> if (record != null) { >>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment(); >>>> data.emit(record); >>>> } >>>> } >>>> } >>>> >>>> >>>> @Override >>>> protected Packages convertToRecord(byte[] data) { >>>> Packages bean = null; >>>> try { >>>> bean = csvReader.read(Packages.class,Packages.COLS); >>>> } catch (IOException e) { >>>> e.printStackTrace(); >>>> } >>>> return bean; >>>> } >>>> >>>> >>>> Based on the above, when blocks are created a record might be split >>>> into two different blocks. When reading the blocks and converting them to >>>> beans, it has to set the offset values appropriately to merge the split >>>> record into one and process it. It looks like this implementation >>>> is already handled in the API when *readerContext.initialize(stream, >>>> blockMetadata, consecutiveBlock)* is called, but when tried to execute >>>> with the above snippet, the following error is thrown because of the split >>>> record. Can you please suggest? >>>> >>>> *Exception:* >>>> >>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR >>>> engine.StreamingContainer run - Operator set >>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff, >>>> 0, >>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]] >>>> stopped running due to an exception. >>>> *java.lang.IllegalArgumentException: the nameMapping array and the >>>> number of columns read should be the same size (nameMapping length = 24, >>>> columns = 5)* >>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180) >>>> at >>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34) >>>> at >>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70) >>>> at >>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208) >>>> at >>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127) >>>> at >>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123) >>>> >>>> >>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote: >>>> >>>> >>>> Routing to dev@apex >>>> >>>> Amol >>>> >>>> >>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote: >>>> >>>>> Hi Pramod, >>>>> >>>>> thanks for the reply, it is working.. >>>>> >>>>> And i have one more query on it, How to decide the block size? >>>>> >>>>> as per my understanding the >>>>> >>>>> noofBlocks=filesize / blocksize >>>>> >>>>> By this some records may be split into two blocks, when converting the >>>>> record we dont have the complete data in one block. >>>>> >>>>> how to handle this? >>>>> >>>>> thanks in adavance. >>>>> >>>>> Thanks -Chiru >>>>> >>>>> >>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote: >>>>>> >>>>>> Hi All, >>>>>> >>>>>> I would like to read a large file using filesplitter and emit >>>>>> tuples.So i have writtent the code like below. >>>>>> >>>>>> >>>>>> public class Reader extends >>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{ >>>>>> >>>>>> @Override >>>>>> protected Data convertToRecord(byte[] data) { /// >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> In my application class i have created the object for filesplitter >>>>>> and Reader classes and connect through stream. >>>>>> >>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, >>>>>> reader.blocksMetadataInput) >>>>>> >>>>>> >>>>>> In properties file iam passing the directory path >>>>>> like >>>>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name> >>>>>> >>>>>> when i run the application iam getting the below error: >>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR >>>>>> engine.StreamingContainer run - Abandoning deployment of operator >>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff, >>>>>> 0, >>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]] >>>>>> due to setup failure.* >>>>>> *java.lang.IllegalArgumentException: empty files* >>>>>> >>>>>> >>>>>> Please suggest is my approach is correct or not? >>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar , >>>>>> share any sample code line. >>>>>> >>>>>> thanks _Chiranjeevi >>>>>> >>>>> -- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Malhar" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> To post to this group, send email to [email protected]. >>>>> Visit this group at http://groups.google.com/group/malhar-users. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>>> >> >
