Hi,

I am facing problem while using HashJoin with input using ParquetTupleScheme. I 
have two source taps of which one is using TextDelimited scheme and the other 
source tap is using ParquetTupleScheme. I am performing a HashJoin and writing 
the data as Delimited file. The program runs successfully on local mode but 
when i tried to run it on cluster, it gives following error :

parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in 
file hdfs://Hostname:8020/user/username/testData/lookup-file.parquet
        at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:211)
        at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:144)
        at 
parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:91)
        at 
parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:42)
        at 
cascading.tap.hadoop.io.MultiRecordReaderIterator.makeReader(MultiRecordReaderIterator.java:123)
        at 
cascading.tap.hadoop.io.MultiRecordReaderIterator.getNextReader(MultiRecordReaderIterator.java:172)
        at 
cascading.tap.hadoop.io.MultiRecordReaderIterator.hasNext(MultiRecordReaderIterator.java:133)
        at 
cascading.tuple.TupleEntrySchemeIterator.<init>(TupleEntrySchemeIterator.java:94)
        at 
cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:49)
        at 
cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:44)
        at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:439)
        at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:108)
        at cascading.flow.stream.element.SourceStage.map(SourceStage.java:82)
        at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
        at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:139)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.NullPointerException
        at 
parquet.hadoop.util.counters.mapred.MapRedCounterAdapter.increment(MapRedCounterAdapter.java:34)
        at 
parquet.hadoop.util.counters.BenchmarkCounter.incrementTotalBytes(BenchmarkCounter.java:75)
        at 
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
        at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:114)
        at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:191)
        ... 21 more

Below are the UseCase:

    public static void main(String[] args) throws IOException {

        Configuration conf = new Configuration();

        String[] otherArgs;

        otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        String argsString = "";
        for (String arg : otherArgs) {
            argsString = argsString + " " + arg;
        }
        System.out.println("After processing arguments are:" + argsString);

        Properties properties = new Properties();
        properties.putAll(conf.getValByRegex(".*"));

        String OutputPath = "testData/BasicEx_Output";
        Class types1[] = { String.class, String.class, String.class };
        Fields f1 = new Fields("id1", "city1", "state");

        Tap source = new Hfs(new TextDelimited(f1, "|", "", types1, false), 
"main-txt-file.dat");
        Pipe pipe = new Pipe("ReadWrite");

        Scheme pScheme = new ParquetTupleScheme();
        Tap source2 = new Hfs(pScheme, "testData/lookup-file.parquet");
        Pipe pipe2 = new Pipe("ReadWrite2");

        Pipe tokenPipe = new HashJoin(pipe, new Fields("id1"), pipe2, new 
Fields("id"), new LeftJoin());

        Tap sink = new Hfs(new TextDelimited(f1, true, "|"), OutputPath, 
SinkMode.REPLACE);

        FlowDef flowDef1 = FlowDef.flowDef().addSource(pipe, 
source).addSource(pipe2, source2).addTailSink(tokenPipe,
                sink);
        new Hadoop2MR1FlowConnector(properties).connect(flowDef1).complete();

    }


I have attached the input files for the reference . Please help me in solving 
this issue.

I have asked the same question on cascading google group and below is response 
for it :

André Kelpe




This looks like a bug caused by a wrong assumption in parquet. I fixed
a similar thing 2 years ago in parquet:
https://github.com/Parquet/parquet-mr/pull/388/ Can you check with the
upstream project? It looks like it is their problem and not a problem
in Cascading.

- André
- show quoted text -
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/4af70450-d5f6-4186-bb9e-8b9755ed7bb3%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
[email protected]
http://concurrentinc.com<http://concurrentinc.com/>


Thanks
Santlal

**************************************Disclaimer******************************************
 This e-mail message and any attachments may contain confidential information 
and is for the sole use of the intended recipient(s) only. Any views or 
opinions presented or implied are solely those of the author and do not 
necessarily represent the views of BitWise. If you are not the intended 
recipient(s), you are hereby notified that disclosure, printing, copying, 
forwarding, distribution, or the taking of any action whatsoever in reliance on 
the contents of this electronic information is strictly prohibited. If you have 
received this e-mail message in error, please immediately notify the sender and 
delete the electronic message and any attachments.BitWise does not accept 
liability for any virus introduced by this e-mail or any attachments. 
********************************************************************************************

Reply via email to