Hi Vk, Please find a CSV block reader here and let me know if you have questions. I have also added a test and it seems to be working fine.
https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader Please note that the BlockReader api has changed from the one you have been using considerably. Thanks, Chandni On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <[email protected]> wrote: > Hi vk, > > I think you don't need to override readBlock() in AbstractBlockReader. > > A simpler way to do this will be using ReadAheadLineReaderContext as the > readerContext and providing implementation of converting bytes to the CSV > bean. > > public class CSVBeanReader extends AbstractFSBlockReader<CSVBean> > { > public CSVBeanReader() > { > this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>(); > } > > @Override protected CSVBean convertToRecord(byte[] bytes) > { > //TODO: convert bytes to bean > return new CSVBean(bytes); > } > } > > Are you using supercsv? I think there is a way to convert bytes to a CSV > Record using it and I may have that example somewhere which I will look up > and let you know. > > Chandni > > > > On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]> > wrote: > >> Here is a detailed description of the problem. >> >>> >>> My file size : *7,590,177 bytes* >>> >>> FIle splitter block size config : >>> >>> >>> >>> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name> >>> <value>*16806*</value> >>> >>> >>> *MyBlockReader Implementation:* >>> >>> @Override >>> protected void readBlock(BlockMetadata blockMetadata) throws IOException >>> { >>> readerContext.initialize(stream, blockMetadata, consecutiveBlock); >>> ReaderContext.Entity entity; >>> csvReader = new CsvBeanReader(new InputStreamReader(stream, >>> encoding),csvPreference); >>> while ((entity = readerContext.next()) != null) { >>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); >>> Packages record = convertToRecord(entity.getRecord()); >>> //if record is partial, ignore the record >>> if (record != null) { >>> counters.getCounter(ReaderCounterKeys.RECORDS).increment(); >>> data.emit(record); >>> } >>> } >>> } >>> >>> >>> @Override >>> protected Packages convertToRecord(byte[] data) { >>> Packages bean = null; >>> try { >>> bean = csvReader.read(Packages.class,Packages.COLS); >>> } catch (IOException e) { >>> e.printStackTrace(); >>> } >>> return bean; >>> } >>> >>> >>> Based on the above, when blocks are created a record might be split into >>> two different blocks. When reading the blocks and converting them to beans, >>> it has to set the offset values appropriately to merge the split record >>> into one and process it. It looks like this implementation >>> is already handled in the API when *readerContext.initialize(stream, >>> blockMetadata, consecutiveBlock)* is called, but when tried to execute >>> with the above snippet, the following error is thrown because of the split >>> record. Can you please suggest? >>> >>> *Exception:* >>> >>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR >>> engine.StreamingContainer run - Operator set >>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff, >>> 0, >>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]] >>> stopped running due to an exception. >>> *java.lang.IllegalArgumentException: the nameMapping array and the >>> number of columns read should be the same size (nameMapping length = 24, >>> columns = 5)* >>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180) >>> at >>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34) >>> at >>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70) >>> at >>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208) >>> at >>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127) >>> at >>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123) >>> >>> >> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote: >>> >>> >>> Routing to dev@apex >>> >>> Amol >>> >>> >>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote: >>> >>>> Hi Pramod, >>>> >>>> thanks for the reply, it is working.. >>>> >>>> And i have one more query on it, How to decide the block size? >>>> >>>> as per my understanding the >>>> >>>> noofBlocks=filesize / blocksize >>>> >>>> By this some records may be split into two blocks, when converting the >>>> record we dont have the complete data in one block. >>>> >>>> how to handle this? >>>> >>>> thanks in adavance. >>>> >>>> Thanks -Chiru >>>> >>>> >>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote: >>>>> >>>>> Hi All, >>>>> >>>>> I would like to read a large file using filesplitter and emit >>>>> tuples.So i have writtent the code like below. >>>>> >>>>> >>>>> public class Reader extends >>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{ >>>>> >>>>> @Override >>>>> protected Data convertToRecord(byte[] data) { /// >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> In my application class i have created the object for filesplitter >>>>> and Reader classes and connect through stream. >>>>> >>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, >>>>> reader.blocksMetadataInput) >>>>> >>>>> >>>>> In properties file iam passing the directory path >>>>> like >>>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name> >>>>> >>>>> when i run the application iam getting the below error: >>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR >>>>> engine.StreamingContainer run - Abandoning deployment of operator >>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff, >>>>> 0, >>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]] >>>>> due to setup failure.* >>>>> *java.lang.IllegalArgumentException: empty files* >>>>> >>>>> >>>>> Please suggest is my approach is correct or not? >>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar , >>>>> share any sample code line. >>>>> >>>>> thanks _Chiranjeevi >>>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "Malhar" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To post to this group, send email to [email protected]. >>>> Visit this group at http://groups.google.com/group/malhar-users. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> >
