Here is a detailed description of the problem.

>
> My file size : *7,590,177  bytes*
>
> FIle splitter block size config :
>
>
>  
> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>  <value>*16806*</value>
>
>
> *MyBlockReader Implementation:*
>
> @Override
> protected void readBlock(BlockMetadata blockMetadata) throws IOException {
> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
> ReaderContext.Entity entity;
> csvReader = new CsvBeanReader(new InputStreamReader(stream, 
> encoding),csvPreference);
> while ((entity = readerContext.next()) != null) {
> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
> Packages record = convertToRecord(entity.getRecord());
> //if record is partial, ignore the record
> if (record != null) {
> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
> data.emit(record);
> }
> } 
> }
>
>
> @Override
> protected Packages convertToRecord(byte[] data) {
> Packages bean = null;
> try {
> bean = csvReader.read(Packages.class,Packages.COLS);
> } catch (IOException e) {
> e.printStackTrace();
> }
> return bean;
> }
>
>
> Based on the above, when blocks are created a record might be split into 
> two different blocks. When reading the blocks and converting them to beans, 
> it has to set the offset values appropriately to merge the split record 
> into one and process it. It looks like this implementation 
> is already handled in the API when *readerContext.initialize(stream, 
> blockMetadata, consecutiveBlock)* is called, but when tried to execute 
> with the above snippet, the following error is thrown because of the split 
> record. Can you please suggest?
>
> *Exception:*
>
> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR 
> engine.StreamingContainer run - Operator set 
> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>  
> 0, 
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>  
> stopped running due to an exception.
> *java.lang.IllegalArgumentException: the nameMapping array and the number 
> of columns read should be the same size (nameMapping length = 24, columns = 
> 5)*
> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
> at 
> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
> at 
> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
> at 
> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
> at 
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
> at 
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>
>
On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>
>
> Routing to dev@apex
>
> Amol
>
>
> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected] <javascript:>> 
> wrote:
>
>> Hi Pramod,
>>
>> thanks for the reply, it is working..
>>
>> And i have one more query on it, How to decide the block size?
>>
>> as per my understanding the 
>>
>> noofBlocks=filesize / blocksize 
>>
>> By this some records may be split into two blocks, when converting the 
>> record we dont have the complete data in one block.
>>
>> how to handle this?
>>
>> thanks in adavance.
>>
>> Thanks -Chiru 
>>
>>
>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>
>>> Hi All,
>>>
>>> I would like to read a large file using filesplitter and emit tuples.So 
>>> i have writtent the code like below.
>>>
>>>
>>> public class Reader extends 
>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>
>>> @Override
>>> protected Data convertToRecord(byte[] data)  { ///
>>> }
>>>
>>> }
>>>
>>>
>>> In my application class  i have created the object for filesplitter and 
>>> Reader classes and connect through stream.
>>>
>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, 
>>> reader.blocksMetadataInput)
>>>
>>>
>>> In properties file iam passing the directory path 
>>> like 
>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>
>>> when i run the application iam getting the below error:
>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR 
>>> engine.StreamingContainer run - Abandoning deployment of operator 
>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>  
>>> 0, 
>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>  
>>> due to setup failure.*
>>> *java.lang.IllegalArgumentException: empty files*
>>>
>>>
>>> Please suggest is my approach is correct or not? 
>>> how to read data using Filesplitter using malhar-library-3.1.0.jar , 
>>> share any sample code line.
>>>
>>> thanks _Chiranjeevi
>>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Malhar" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/malhar-users.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

Reply via email to