Hi vk,

I think you don't need to override readBlock() in AbstractBlockReader.

A simpler way to do this will be using ReadAheadLineReaderContext as the
readerContext and providing implementation of converting bytes to the CSV
bean.

public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
{
  public CSVBeanReader()
  {
    this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
  }

  @Override protected CSVBean convertToRecord(byte[] bytes)
  {
    //TODO: convert bytes to bean
    return new CSVBean(bytes);
  }
}

Are you using supercsv? I think there is a way to convert bytes to a CSV
Record using it and I may have that example somewhere which I will look up
and let you know.

Chandni



On Tue, Sep 29, 2015 at 2:06 PM, vk <[email protected]> wrote:

>  Here is a detailed description of the problem.
>
>>
>> My file size : *7,590,177  bytes*
>>
>> FIle splitter block size config :
>>
>>
>>  
>> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>  <value>*16806*</value>
>>
>>
>> *MyBlockReader Implementation:*
>>
>> @Override
>> protected void readBlock(BlockMetadata blockMetadata) throws IOException {
>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>> ReaderContext.Entity entity;
>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>> encoding),csvPreference);
>> while ((entity = readerContext.next()) != null) {
>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>> Packages record = convertToRecord(entity.getRecord());
>> //if record is partial, ignore the record
>> if (record != null) {
>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>> data.emit(record);
>> }
>> }
>> }
>>
>>
>> @Override
>> protected Packages convertToRecord(byte[] data) {
>> Packages bean = null;
>> try {
>> bean = csvReader.read(Packages.class,Packages.COLS);
>> } catch (IOException e) {
>> e.printStackTrace();
>> }
>> return bean;
>> }
>>
>>
>> Based on the above, when blocks are created a record might be split into
>> two different blocks. When reading the blocks and converting them to beans,
>> it has to set the offset values appropriately to merge the split record
>> into one and process it. It looks like this implementation
>> is already handled in the API when *readerContext.initialize(stream,
>> blockMetadata, consecutiveBlock)* is called, but when tried to execute
>> with the above snippet, the following error is thrown because of the split
>> record. Can you please suggest?
>>
>> *Exception:*
>>
>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>> engine.StreamingContainer run - Operator set
>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>> 0,
>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>> stopped running due to an exception.
>> *java.lang.IllegalArgumentException: the nameMapping array and the number
>> of columns read should be the same size (nameMapping length = 24, columns =
>> 5)*
>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>> at
>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>> at
>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>> at
>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>> at
>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>> at
>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>
>>
> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>
>>
>> Routing to dev@apex
>>
>> Amol
>>
>>
>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected]> wrote:
>>
>>> Hi Pramod,
>>>
>>> thanks for the reply, it is working..
>>>
>>> And i have one more query on it, How to decide the block size?
>>>
>>> as per my understanding the
>>>
>>> noofBlocks=filesize / blocksize
>>>
>>> By this some records may be split into two blocks, when converting the
>>> record we dont have the complete data in one block.
>>>
>>> how to handle this?
>>>
>>> thanks in adavance.
>>>
>>> Thanks -Chiru
>>>
>>>
>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>
>>>> Hi All,
>>>>
>>>> I would like to read a large file using filesplitter and emit tuples.So
>>>> i have writtent the code like below.
>>>>
>>>>
>>>> public class Reader extends
>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>
>>>> @Override
>>>> protected Data convertToRecord(byte[] data)  { ///
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> In my application class  i have created the object for filesplitter and
>>>> Reader classes and connect through stream.
>>>>
>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>> reader.blocksMetadataInput)
>>>>
>>>>
>>>> In properties file iam passing the directory path
>>>> like 
>>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>
>>>> when i run the application iam getting the below error:
>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>> 0,
>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>> due to setup failure.*
>>>> *java.lang.IllegalArgumentException: empty files*
>>>>
>>>>
>>>> Please suggest is my approach is correct or not?
>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>> share any sample code line.
>>>>
>>>> thanks _Chiranjeevi
>>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Malhar" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/malhar-users.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>

Reply via email to