Thanks Chandni On Wed, Oct 28, 2015 at 11:31 AM, Chandni Singh <[email protected]> wrote:
> Folks, > > You can comment on these pull requests: > https://github.com/DataTorrent/docs/pull/2 > https://github.com/DataTorrent/docs/pull/3 > > Chandni > > > On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh <[email protected]> > wrote: > > > HI All, > > > > We have created tutorials for FileSplitter and BlockReader here: > > > > > https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md > > > > > https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md > > > > Please have a look. Any feedback is appreciated. > > > > Thanks, > > Chandni > > > > On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <[email protected]> > > wrote: > > > >> Hi Vk, > >> > >> Please find a CSV block reader here and let me know if you have > >> questions. I have also added a test and it seems to be working fine. > >> > >> > >> > https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader > >> > >> Please note that the BlockReader api has changed from the one you have > >> been using considerably. > >> > >> Thanks, > >> Chandni > >> > >> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <[email protected] > > > >> wrote: > >> > >>> Hi vk, > >>> > >>> I think you don't need to override readBlock() in AbstractBlockReader. > >>> > >>> A simpler way to do this will be using ReadAheadLineReaderContext as > the > >>> readerContext and providing implementation of converting bytes to the > CSV > >>> bean. > >>> > >>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean> > >>> { > >>> public CSVBeanReader() > >>> { > >>> this.readerContext = new > ReaderContext.ReadAheadLineReaderContext<>(); > >>> } > >>> > >>> @Override protected CSVBean convertToRecord(byte[] bytes) > >>> { > >>> //TODO: convert bytes to bean > >>> return new CSVBean(bytes); > >>> } > >>> } > >>> > >>> Are you using supercsv? I think there is a way to convert bytes to a > CSV > >>> Record using it and I may have that example somewhere which I will > look up > >>> and let you know. > >>> > >>> Chandni > >>> > >>> > >>> > >>> On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]> > >>> wrote: > >>> > >>>> Here is a detailed description of the problem. > >>>> > >>>>> > >>>>> My file size : *7,590,177 bytes* > >>>>> > >>>>> FIle splitter block size config : > >>>>> > >>>>> > >>>>> > <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name> > >>>>> <value>*16806*</value> > >>>>> > >>>>> > >>>>> *MyBlockReader Implementation:* > >>>>> > >>>>> @Override > >>>>> protected void readBlock(BlockMetadata blockMetadata) throws > >>>>> IOException { > >>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock); > >>>>> ReaderContext.Entity entity; > >>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream, > >>>>> encoding),csvPreference); > >>>>> while ((entity = readerContext.next()) != null) { > >>>>> > >>>>> > counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); > >>>>> Packages record = convertToRecord(entity.getRecord()); > >>>>> //if record is partial, ignore the record > >>>>> if (record != null) { > >>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment(); > >>>>> data.emit(record); > >>>>> } > >>>>> } > >>>>> } > >>>>> > >>>>> > >>>>> @Override > >>>>> protected Packages convertToRecord(byte[] data) { > >>>>> Packages bean = null; > >>>>> try { > >>>>> bean = csvReader.read(Packages.class,Packages.COLS); > >>>>> } catch (IOException e) { > >>>>> e.printStackTrace(); > >>>>> } > >>>>> return bean; > >>>>> } > >>>>> > >>>>> > >>>>> Based on the above, when blocks are created a record might be split > >>>>> into two different blocks. When reading the blocks and converting > them to > >>>>> beans, it has to set the offset values appropriately to merge the > split > >>>>> record into one and process it. It looks like this implementation > >>>>> is already handled in the API when *readerContext.initialize(stream, > >>>>> blockMetadata, consecutiveBlock)* is called, but when tried to > >>>>> execute with the above snippet, the following error is thrown > because of > >>>>> the split record. Can you please suggest? > >>>>> > >>>>> *Exception:* > >>>>> > >>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR > >>>>> engine.StreamingContainer run - Operator set > >>>>> > [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff, > >>>>> 0, > >>>>> > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]] > >>>>> stopped running due to an exception. > >>>>> *java.lang.IllegalArgumentException: the nameMapping array and the > >>>>> number of columns read should be the same size (nameMapping length = > 24, > >>>>> columns = 5)* > >>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180) > >>>>> at > >>>>> > com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34) > >>>>> at > >>>>> > com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70) > >>>>> at > >>>>> > com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208) > >>>>> at > >>>>> > com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127) > >>>>> at > >>>>> > com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123) > >>>>> > >>>>> > >>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote: > >>>>> > >>>>> > >>>>> Routing to dev@apex > >>>>> > >>>>> Amol > >>>>> > >>>>> > >>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote: > >>>>> > >>>>>> Hi Pramod, > >>>>>> > >>>>>> thanks for the reply, it is working.. > >>>>>> > >>>>>> And i have one more query on it, How to decide the block size? > >>>>>> > >>>>>> as per my understanding the > >>>>>> > >>>>>> noofBlocks=filesize / blocksize > >>>>>> > >>>>>> By this some records may be split into two blocks, when converting > >>>>>> the record we dont have the complete data in one block. > >>>>>> > >>>>>> how to handle this? > >>>>>> > >>>>>> thanks in adavance. > >>>>>> > >>>>>> Thanks -Chiru > >>>>>> > >>>>>> > >>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote: > >>>>>>> > >>>>>>> Hi All, > >>>>>>> > >>>>>>> I would like to read a large file using filesplitter and emit > >>>>>>> tuples.So i have writtent the code like below. > >>>>>>> > >>>>>>> > >>>>>>> public class Reader extends > >>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{ > >>>>>>> > >>>>>>> @Override > >>>>>>> protected Data convertToRecord(byte[] data) { /// > >>>>>>> } > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> In my application class i have created the object for filesplitter > >>>>>>> and Reader classes and connect through stream. > >>>>>>> > >>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, > >>>>>>> reader.blocksMetadataInput) > >>>>>>> > >>>>>>> > >>>>>>> In properties file iam passing the directory path > >>>>>>> like > <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name> > >>>>>>> > >>>>>>> when i run the application iam getting the below error: > >>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR > >>>>>>> engine.StreamingContainer run - Abandoning deployment of operator > >>>>>>> > OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff, > >>>>>>> 0, > >>>>>>> > 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]] > >>>>>>> due to setup failure.* > >>>>>>> *java.lang.IllegalArgumentException: empty files* > >>>>>>> > >>>>>>> > >>>>>>> Please suggest is my approach is correct or not? > >>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar > , > >>>>>>> share any sample code line. > >>>>>>> > >>>>>>> thanks _Chiranjeevi > >>>>>>> > >>>>>> -- > >>>>>> You received this message because you are subscribed to the Google > >>>>>> Groups "Malhar" group. > >>>>>> To unsubscribe from this group and stop receiving emails from it, > >>>>>> send an email to [email protected]. > >>>>>> To post to this group, send email to [email protected]. > >>>>>> Visit this group at http://groups.google.com/group/malhar-users. > >>>>>> For more options, visit https://groups.google.com/d/optout. > >>>>>> > >>>>> > >>>>> > >>> > >> > > >
