Here is a detailed description of the problem.
>
> My file size : *7,590,177 bytes*
>
> FIle splitter block size config :
>
>
>
> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
> <value>*16806*</value>
>
>
> *MyBlockReader Implementation:*
>
> @Override
> protected void readBlock(BlockMetadata blockMetadata) throws IOException {
> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
> ReaderContext.Entity entity;
> csvReader = new CsvBeanReader(new InputStreamReader(stream,
> encoding),csvPreference);
> while ((entity = readerContext.next()) != null) {
> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
> Packages record = convertToRecord(entity.getRecord());
> //if record is partial, ignore the record
> if (record != null) {
> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
> data.emit(record);
> }
> }
> }
>
>
> @Override
> protected Packages convertToRecord(byte[] data) {
> Packages bean = null;
> try {
> bean = csvReader.read(Packages.class,Packages.COLS);
> } catch (IOException e) {
> e.printStackTrace();
> }
> return bean;
> }
>
>
> Based on the above, when blocks are created a record might be split into
> two different blocks. When reading the blocks and converting them to beans,
> it has to set the offset values appropriately to merge the split record
> into one and process it. It looks like this implementation
> is already handled in the API when *readerContext.initialize(stream,
> blockMetadata, consecutiveBlock)* is called, but when tried to execute
> with the above snippet, the following error is thrown because of the split
> record. Can you please suggest?
>
> *Exception:*
>
> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
> engine.StreamingContainer run - Operator set
> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>
> 0,
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>
> stopped running due to an exception.
> *java.lang.IllegalArgumentException: the nameMapping array and the number
> of columns read should be the same size (nameMapping length = 24, columns =
> 5)*
> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
> at
> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
> at
> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
> at
> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
> at
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
> at
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>
>
On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>
>
> Routing to dev@apex
>
> Amol
>
>
> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <[email protected] <javascript:>>
> wrote:
>
>> Hi Pramod,
>>
>> thanks for the reply, it is working..
>>
>> And i have one more query on it, How to decide the block size?
>>
>> as per my understanding the
>>
>> noofBlocks=filesize / blocksize
>>
>> By this some records may be split into two blocks, when converting the
>> record we dont have the complete data in one block.
>>
>> how to handle this?
>>
>> thanks in adavance.
>>
>> Thanks -Chiru
>>
>>
>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>
>>> Hi All,
>>>
>>> I would like to read a large file using filesplitter and emit tuples.So
>>> i have writtent the code like below.
>>>
>>>
>>> public class Reader extends
>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>
>>> @Override
>>> protected Data convertToRecord(byte[] data) { ///
>>> }
>>>
>>> }
>>>
>>>
>>> In my application class i have created the object for filesplitter and
>>> Reader classes and connect through stream.
>>>
>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>> reader.blocksMetadataInput)
>>>
>>>
>>> In properties file iam passing the directory path
>>> like
>>> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>
>>> when i run the application iam getting the below error:
>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>> engine.StreamingContainer run - Abandoning deployment of operator
>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>
>>> 0,
>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>
>>> due to setup failure.*
>>> *java.lang.IllegalArgumentException: empty files*
>>>
>>>
>>> Please suggest is my approach is correct or not?
>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>> share any sample code line.
>>>
>>> thanks _Chiranjeevi
>>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Malhar" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/malhar-users.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>