Santlal,

What version of Parquet are you using? I think this was recently fixed by
Reuben.

rb

On Tue, Feb 16, 2016 at 5:16 AM, Santlal J Gupta <
[email protected]> wrote:

> Hi,
>
> I am facing problem while using *HashJoin* with input using
> *ParquetTupleScheme*. I have two source taps of which one is using
> *TextDelimited* scheme and the other source tap is using
> *ParquetTupleScheme. *I am performing a *HashJoin *and writing the data
> as Delimited file. The program runs successfully on local mode but when i
> tried to run it on cluster, it gives following error :
>
> parquet.io.ParquetDecodingException: Can not read value at 0 in block -1
> in file hdfs://Hostname:8020/user/username/testData/lookup-file.parquet
>         at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:211)
>         at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:144)
>         at
> parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:91)
>         at
> parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:42)
>         at
> cascading.tap.hadoop.io.MultiRecordReaderIterator.makeReader(MultiRecordReaderIterator.java:123)
>         at
> cascading.tap.hadoop.io.MultiRecordReaderIterator.getNextReader(MultiRecordReaderIterator.java:172)
>         at
> cascading.tap.hadoop.io.MultiRecordReaderIterator.hasNext(MultiRecordReaderIterator.java:133)
>         at
> cascading.tuple.TupleEntrySchemeIterator.<init>(TupleEntrySchemeIterator.java:94)
>         at
> cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:49)
>         at
> cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:44)
>         at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:439)
>         at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:108)
>         at
> cascading.flow.stream.element.SourceStage.map(SourceStage.java:82)
>         at
> cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
>         at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:139)
>         at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
>         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> Caused by: java.lang.NullPointerException
>         at
> parquet.hadoop.util.counters.mapred.MapRedCounterAdapter.increment(MapRedCounterAdapter.java:34)
>         at
> parquet.hadoop.util.counters.BenchmarkCounter.incrementTotalBytes(BenchmarkCounter.java:75)
>         at
> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
>         at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:114)
>         at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:191)
>         ... 21 more
>
> *Below are the UseCase:*
>
>     public static void main(String[] args) throws IOException {
>
>         Configuration conf = new Configuration();
>
>         String[] otherArgs;
>
>         otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
>
>         String argsString = "";
>         for (String arg : otherArgs) {
>             argsString = argsString + " " + arg;
>         }
>         System.out.println("After processing arguments are:" + argsString);
>
>         Properties properties = new Properties();
>         properties.putAll(conf.getValByRegex(".*"));
>
>         String OutputPath = "testData/BasicEx_Output";
>         Class types1[] = { String.class, String.class, String.class };
>         Fields f1 = new Fields("id1", "city1", "state");
>
>         Tap source = new Hfs(new TextDelimited(f1, "|", "", types1,
> false), "main-txt-file.dat");
>         Pipe pipe = new Pipe("ReadWrite");
>
>         Scheme pScheme = new ParquetTupleScheme();
>         Tap source2 = new Hfs(pScheme, "testData/lookup-file.parquet");
>         Pipe pipe2 = new Pipe("ReadWrite2");
>
>         Pipe tokenPipe = new HashJoin(pipe, new Fields("id1"), pipe2, new
> Fields("id"), new LeftJoin());
>
>         Tap sink = new Hfs(new TextDelimited(f1, true, "|"), OutputPath,
> SinkMode.REPLACE);
>
>         FlowDef flowDef1 = FlowDef.flowDef().addSource(pipe,
> source).addSource(pipe2, source2).addTailSink(tokenPipe,
>                 sink);
>         new
> Hadoop2MR1FlowConnector(properties).connect(flowDef1).complete();
>
>     }
>
>
> I have attached the input files for the reference . Please help me in
> solving this issue.
>
>
>
> I have asked the same question on cascading google group and below is
> response for it :
>
>
>
> *André Kelpe *
>
>
>
>
>
> This looks like a bug caused by a wrong assumption in parquet. I fixed
> a similar thing 2 years ago in parquet:
> https://github.com/Parquet/parquet-mr/pull/388/ Can you check with the
> upstream project? It looks like it is their problem and not a problem
> in Cascading.
>
> - André
>
> - show quoted text -
>
> > --
> > You received this message because you are subscribed to the Google Groups
>
> > "cascading-user" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
>
> > email to [email protected].
> > To post to this group, send email to [email protected].
> > Visit this group at https://groups.google.com/group/cascading-user.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/cascading-user/4af70450-d5f6-4186-bb9e-8b9755ed7bb3%40googlegroups.com
> .
> > For more options, visit https://groups.google.com/d/optout.
>
>
>
> --
> André Kelpe
> [email protected]
> http://concurrentinc.com
>
>
>
>
>
> Thanks
>
> Santlal
>
>
> **************************************Disclaimer******************************************
> This e-mail message and any attachments may contain confidential
> information and is for the sole use of the intended recipient(s) only. Any
> views or opinions presented or implied are solely those of the author and
> do not necessarily represent the views of BitWise. If you are not the
> intended recipient(s), you are hereby notified that disclosure, printing,
> copying, forwarding, distribution, or the taking of any action whatsoever
> in reliance on the contents of this electronic information is strictly
> prohibited. If you have received this e-mail message in error, please
> immediately notify the sender and delete the electronic message and any
> attachments.BitWise does not accept liability for any virus introduced by
> this e-mail or any attachments.
> ********************************************************************************************
>
>



-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to