Folks, You can comment on these pull requests: https://github.com/DataTorrent/docs/pull/2 https://github.com/DataTorrent/docs/pull/3
Chandni On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh <[email protected]> wrote: > HI All, > > We have created tutorials for FileSplitter and BlockReader here: > > https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md > > https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md > > Please have a look. Any feedback is appreciated. > > Thanks, > Chandni > > On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <[email protected]> > wrote: > >> Hi Vk, >> >> Please find a CSV block reader here and let me know if you have >> questions. I have also added a test and it seems to be working fine. >> >> >> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader >> >> Please note that the BlockReader api has changed from the one you have >> been using considerably. >> >> Thanks, >> Chandni >> >> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <[email protected]> >> wrote: >> >>> Hi vk, >>> >>> I think you don't need to override readBlock() in AbstractBlockReader. >>> >>> A simpler way to do this will be using ReadAheadLineReaderContext as the >>> readerContext and providing implementation of converting bytes to the CSV >>> bean. >>> >>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean> >>> { >>> public CSVBeanReader() >>> { >>> this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>(); >>> } >>> >>> @Override protected CSVBean convertToRecord(byte[] bytes) >>> { >>> //TODO: convert bytes to bean >>> return new CSVBean(bytes); >>> } >>> } >>> >>> Are you using supercsv? I think there is a way to convert bytes to a CSV >>> Record using it and I may have that example somewhere which I will look up >>> and let you know. >>> >>> Chandni >>> >>> >>> >>> On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]> >>> wrote: >>> >>>> Here is a detailed description of the problem. >>>> >>>>> >>>>> My file size : *7,590,177 bytes* >>>>> >>>>> FIle splitter block size config : >>>>> >>>>> >>>>> >>>>> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name> >>>>> <value>*16806*</value> >>>>> >>>>> >>>>> *MyBlockReader Implementation:* >>>>> >>>>> @Override >>>>> protected void readBlock(BlockMetadata blockMetadata) throws >>>>> IOException { >>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock); >>>>> ReaderContext.Entity entity; >>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream, >>>>> encoding),csvPreference); >>>>> while ((entity = readerContext.next()) != null) { >>>>> >>>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); >>>>> Packages record = convertToRecord(entity.getRecord()); >>>>> //if record is partial, ignore the record >>>>> if (record != null) { >>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment(); >>>>> data.emit(record); >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> @Override >>>>> protected Packages convertToRecord(byte[] data) { >>>>> Packages bean = null; >>>>> try { >>>>> bean = csvReader.read(Packages.class,Packages.COLS); >>>>> } catch (IOException e) { >>>>> e.printStackTrace(); >>>>> } >>>>> return bean; >>>>> } >>>>> >>>>> >>>>> Based on the above, when blocks are created a record might be split >>>>> into two different blocks. When reading the blocks and converting them to >>>>> beans, it has to set the offset values appropriately to merge the split >>>>> record into one and process it. It looks like this implementation >>>>> is already handled in the API when *readerContext.initialize(stream, >>>>> blockMetadata, consecutiveBlock)* is called, but when tried to >>>>> execute with the above snippet, the following error is thrown because of >>>>> the split record. Can you please suggest? >>>>> >>>>> *Exception:* >>>>> >>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR >>>>> engine.StreamingContainer run - Operator set >>>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff, >>>>> 0, >>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]] >>>>> stopped running due to an exception. >>>>> *java.lang.IllegalArgumentException: the nameMapping array and the >>>>> number of columns read should be the same size (nameMapping length = 24, >>>>> columns = 5)* >>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180) >>>>> at >>>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34) >>>>> at >>>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70) >>>>> at >>>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208) >>>>> at >>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127) >>>>> at >>>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123) >>>>> >>>>> >>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote: >>>>> >>>>> >>>>> Routing to dev@apex >>>>> >>>>> Amol >>>>> >>>>> >>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote: >>>>> >>>>>> Hi Pramod, >>>>>> >>>>>> thanks for the reply, it is working.. >>>>>> >>>>>> And i have one more query on it, How to decide the block size? >>>>>> >>>>>> as per my understanding the >>>>>> >>>>>> noofBlocks=filesize / blocksize >>>>>> >>>>>> By this some records may be split into two blocks, when converting >>>>>> the record we dont have the complete data in one block. >>>>>> >>>>>> how to handle this? >>>>>> >>>>>> thanks in adavance. >>>>>> >>>>>> Thanks -Chiru >>>>>> >>>>>> >>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I would like to read a large file using filesplitter and emit >>>>>>> tuples.So i have writtent the code like below. >>>>>>> >>>>>>> >>>>>>> public class Reader extends >>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{ >>>>>>> >>>>>>> @Override >>>>>>> protected Data convertToRecord(byte[] data) { /// >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> In my application class i have created the object for filesplitter >>>>>>> and Reader classes and connect through stream. >>>>>>> >>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, >>>>>>> reader.blocksMetadataInput) >>>>>>> >>>>>>> >>>>>>> In properties file iam passing the directory path >>>>>>> like >>>>>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name> >>>>>>> >>>>>>> when i run the application iam getting the below error: >>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR >>>>>>> engine.StreamingContainer run - Abandoning deployment of operator >>>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff, >>>>>>> 0, >>>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]] >>>>>>> due to setup failure.* >>>>>>> *java.lang.IllegalArgumentException: empty files* >>>>>>> >>>>>>> >>>>>>> Please suggest is my approach is correct or not? >>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar , >>>>>>> share any sample code line. >>>>>>> >>>>>>> thanks _Chiranjeevi >>>>>>> >>>>>> -- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Malhar" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>> send an email to [email protected]. >>>>>> To post to this group, send email to [email protected]. >>>>>> Visit this group at http://groups.google.com/group/malhar-users. >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>> >>>>> >>>>> >>> >> >
