Marcelo Vanzin created SPARK-16632:
--------------------------------------

             Summary: Vectorized parquet reader fails to read certain fields 
from Hive tables
                 Key: SPARK-16632
                 URL: https://issues.apache.org/jira/browse/SPARK-16632
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.0.0
         Environment: Hive 1.1 (CDH)
            Reporter: Marcelo Vanzin


The vectorized parquet reader fails to read certain tables created by Hive. 
When the tables have type "tinyint" or "smallint", Catalyst converts those to 
"ByteType" and "ShortType" respectively. But when Hive writes those tables in 
parquet format, the parquet schema in the files contains "int32" fields.

To reproduce, run these commands in the hive shell (or beeline):

{code}
create table abyte (value tinyint) stored as parquet;
create table ashort (value smallint) stored as parquet;
insert into abyte values (1);
insert into ashort values (1);
{code}

Then query them with Spark 2.0:

{code}
spark.sql("select * from abyte").show();
spark.sql("select * from ashort").show();
{code}

You'll see this exception (for the byte case):

{noformat}
16/07/13 12:24:23 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting 
job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, scm-centos71-iqalat-2.gce.cloudera.com): org.apache.spark.SparkException: 
Task failed while writing rows
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getByte(OnHeapColumnVector.java:159)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
        ... 8 more
{noformat}

This works when you point Spark directly at the files (instead of using the 
metastore data), or when you disable the vectorized parquet reader.

The root cause seems to be that Hive creates these tables with a 
not-so-complete schema:

{noformat}
$ parquet-tools schema /tmp/byte.parquet 
message hive_schema {
  optional int32 value;
}
{noformat}

There's no indication that the field is a 32-bit field used to store 8-bit 
values. When the ParquetReadSupport code tries to consolidate both schemas, it 
just chooses whatever is in the parquet file for primitive types (see 
ParquetReadSupport.clipParquetType); the vectorized reader uses the catalyst 
schema, which comes from the Hive metastore, and says it's a byte field, so 
when it tries to read the data, the byte data stored in "OnHeapColumnVector" is 
null.

I have tested a small change to {{ParquetReadSupport.clipParquetType}} that 
fixes this particular issue, but I haven't run any other tests, so I'll do that 
while I wait for others to chime in and maybe tell me that's not the right 
place to fix this.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to