Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f93667f84 -> ccc4a2045


[SPARK-23822][SQL] Improve error message for Parquet schema mismatches

## What changes were proposed in this pull request?

This pull request tries to improve the error message for spark while reading 
parquet files with different schemas, e.g. One with a STRING column and the 
other with a INT column. A new ParquetSchemaColumnConvertNotSupportedException 
is added to replace the old UnsupportedOperationException. The Exception is 
again wrapped in FileScanRdd.scala to throw a more a general 
QueryExecutionException with the actual parquet file name which trigger the 
exception.

## How was this patch tested?

Unit tests added to check the new exception and verify the error messages.

Also manually tested with two parquet with different schema to check the error 
message.

<img width="1125" alt="screen shot 2018-03-30 at 4 03 04 pm" 
src="https://user-images.githubusercontent.com/37087310/38156580-dd58a140-3433-11e8-973a-b816d859fbe1.png";>

Author: Yuchen Huo <yuchen....@databricks.com>

Closes #20953 from yuchenhuo/SPARK-23822.

(cherry picked from commit 94524019315ad463f9bc13c107131091d17c6af9)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccc4a204
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccc4a204
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccc4a204

Branch: refs/heads/branch-2.3
Commit: ccc4a20453bbbaf1f3e5e46fb7c0277f1e6c65b9
Parents: f93667f
Author: Yuchen Huo <yuchen....@databricks.com>
Authored: Fri Apr 6 08:35:20 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Apr 6 08:36:20 2018 -0700

----------------------------------------------------------------------
 ...chemaColumnConvertNotSupportedException.java | 62 ++++++++++++++++++++
 .../parquet/VectorizedColumnReader.java         | 38 ++++++++----
 .../sql/execution/QueryExecutionException.scala |  3 +-
 .../sql/execution/datasources/FileScanRDD.scala | 21 ++++++-
 .../parquet/ParquetSchemaSuite.scala            | 55 +++++++++++++++++
 5 files changed, 166 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccc4a204/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
new file mode 100644
index 0000000..82a1169
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * Exception thrown when the parquet reader find column type mismatches.
+ */
+@InterfaceStability.Unstable
+public class SchemaColumnConvertNotSupportedException extends RuntimeException 
{
+
+  /**
+   * Name of the column which cannot be converted.
+   */
+  private String column;
+  /**
+   * Physical column type in the actual parquet file.
+   */
+  private String physicalType;
+  /**
+   * Logical column type in the parquet schema the parquet reader use to parse 
all files.
+   */
+  private String logicalType;
+
+  public String getColumn() {
+    return column;
+  }
+
+  public String getPhysicalType() {
+    return physicalType;
+  }
+
+  public String getLogicalType() {
+    return logicalType;
+  }
+
+  public SchemaColumnConvertNotSupportedException(
+      String column,
+      String physicalType,
+      String logicalType) {
+    super();
+    this.column = column;
+    this.physicalType = physicalType;
+    this.logicalType = logicalType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc4a204/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 47dd625..72f1d02 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.TimeZone;
 
 import org.apache.parquet.bytes.BytesUtils;
@@ -31,6 +32,7 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
@@ -232,6 +234,18 @@ public class VectorizedColumnReader {
   }
 
   /**
+   * Helper function to construct exception for parquet schema mismatch.
+   */
+  private SchemaColumnConvertNotSupportedException 
constructConvertNotSupportedException(
+      ColumnDescriptor descriptor,
+      WritableColumnVector column) {
+    return new SchemaColumnConvertNotSupportedException(
+      Arrays.toString(descriptor.getPath()),
+      descriptor.getType().toString(),
+      column.dataType().toString());
+  }
+
+  /**
    * Reads `num` values into column, decoding the values from `dictionaryIds` 
and `dictionary`.
    */
   private void decodeDictionaryIds(
@@ -261,7 +275,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
 
@@ -282,7 +296,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
 
@@ -321,7 +335,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException();
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
       case BINARY:
@@ -360,7 +374,7 @@ public class VectorizedColumnReader {
             }
           }
         } else {
-          throw new UnsupportedOperationException();
+          throw constructConvertNotSupportedException(descriptor, column);
         }
         break;
 
@@ -375,7 +389,9 @@ public class VectorizedColumnReader {
    */
 
   private void readBooleanBatch(int rowId, int num, WritableColumnVector 
column) {
-    assert(column.dataType() == DataTypes.BooleanType);
+    if (column.dataType() != DataTypes.BooleanType) {
+      throw constructConvertNotSupportedException(descriptor, column);
+    }
     defColumn.readBooleans(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
   }
@@ -394,7 +410,7 @@ public class VectorizedColumnReader {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -414,7 +430,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Unsupported conversion to: " + 
column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -425,7 +441,7 @@ public class VectorizedColumnReader {
       defColumn.readFloats(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
     } else {
-      throw new UnsupportedOperationException("Unsupported conversion to: " + 
column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -436,7 +452,7 @@ public class VectorizedColumnReader {
       defColumn.readDoubles(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -471,7 +487,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 
@@ -510,7 +526,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Unimplemented type: " + 
column.dataType());
+      throw constructConvertNotSupportedException(descriptor, column);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc4a204/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
index 16806c6..cffd97b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecutionException.scala
@@ -17,4 +17,5 @@
 
 package org.apache.spark.sql.execution
 
-class QueryExecutionException(message: String) extends Exception(message)
+class QueryExecutionException(message: String, cause: Throwable = null)
+  extends Exception(message, cause)

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc4a204/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 835ce98..28c36b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -21,11 +21,14 @@ import java.io.{FileNotFoundException, IOException}
 
 import scala.collection.mutable
 
+import org.apache.parquet.io.ParquetDecodingException
+
 import org.apache.spark.{Partition => RDDPartition, TaskContext, 
TaskKilledException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.NextIterator
 
@@ -179,7 +182,23 @@ class FileScanRDD(
             currentIterator = readCurrentFile()
           }
 
-          hasNext
+          try {
+            hasNext
+          } catch {
+            case e: SchemaColumnConvertNotSupportedException =>
+              val message = "Parquet column cannot be converted in " +
+                s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
+                s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
+              throw new QueryExecutionException(message, e)
+            case e: ParquetDecodingException =>
+              if (e.getMessage.contains("Can not read value at")) {
+                val message = "Encounter error while reading parquet files. " +
+                  "One possible cause: Parquet column cannot be converted in 
the " +
+                  "corresponding files. Details: "
+                throw new QueryExecutionException(message, e)
+              }
+              throw e
+          }
         } else {
           currentFile = null
           InputFileBlockHolder.unset()

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc4a204/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 2cd2a60..9d3dfae 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -20,10 +20,13 @@ package org.apache.spark.sql.execution.datasources.parquet
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.parquet.io.ParquetDecodingException
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.execution.QueryExecutionException
+import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -382,6 +385,58 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
+  // =======================================
+  // Tests for parquet schema mismatch error
+  // =======================================
+  def testSchemaMismatch(path: String, vectorizedReaderEnabled: Boolean): 
SparkException = {
+    import testImplicits._
+
+    var e: SparkException = null
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorizedReaderEnabled.toString) {
+      // Create two parquet files with different schemas in the same folder
+      Seq(("bcd", 2)).toDF("a", 
"b").coalesce(1).write.mode("overwrite").parquet(s"$path/parquet")
+      Seq((1, "abc")).toDF("a", 
"b").coalesce(1).write.mode("append").parquet(s"$path/parquet")
+
+      e = intercept[SparkException] {
+        spark.read.parquet(s"$path/parquet").collect()
+      }
+    }
+    e
+  }
+
+  test("schema mismatch failure error message for parquet reader") {
+    withTempPath { dir =>
+      val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled 
= false)
+      val expectedMessage = "Encounter error while reading parquet files. " +
+        "One possible cause: Parquet column cannot be converted in the 
corresponding " +
+        "files. Details:"
+      assert(e.getCause.isInstanceOf[QueryExecutionException])
+      assert(e.getCause.getCause.isInstanceOf[ParquetDecodingException])
+      assert(e.getCause.getMessage.startsWith(expectedMessage))
+    }
+  }
+
+  test("schema mismatch failure error message for parquet vectorized reader") {
+    withTempPath { dir =>
+      val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled 
= true)
+      assert(e.getCause.isInstanceOf[QueryExecutionException])
+      
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+
+      // Check if the physical type is reporting correctly
+      val errMsg = e.getCause.getMessage
+      assert(errMsg.startsWith("Parquet column cannot be converted in file"))
+      val file = errMsg.substring("Parquet column cannot be converted in file 
".length,
+        errMsg.indexOf(". "))
+      val col = 
spark.read.parquet(file).schema.fields.filter(_.name.equals("a"))
+      assert(col.length == 1)
+      if (col(0).dataType == StringType) {
+        assert(errMsg.contains("Column: [a], Expected: IntegerType, Found: 
BINARY"))
+      } else {
+        assert(errMsg.endsWith("Column: [a], Expected: StringType, Found: 
INT32"))
+      }
+    }
+  }
+
   // =======================================================
   // Tests for converting Parquet LIST to Catalyst ArrayType
   // =======================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to