voonhous opened a new issue, #7444:
URL: https://github.com/apache/hudi/issues/7444

   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? (Yes)
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   Implicit schema changes that do not write to the `.schema` folder will cause 
read issues on Spark's end.
   
   The current implementation of Schema Evolution is as such:
   If the schema change is supported by the Avro's Schema resolution, `ALTER 
TABLE DDL` is not required.
   
   The column type changes that are supported by Avro's Schema resolution is as 
such:
   
![image](https://user-images.githubusercontent.com/6312314/207312745-1e8a63a6-7423-4ff9-aa20-327c11d8bda5.png)
   
   **Caveat:**
   The current implementation is sufficient provided that ALL data is 
re-written with the new schema. However, if there are certain 
filegroups/partition that are still in the old schema when being read out, 
errors will be thrown.
   
   As such, the current support for implicit column changes is still a little 
buggy when it comes to column changes.
   
   To reproduce the issue, one can use this script below to test the schema 
evolution that is "allegedly" supported by Hudi's implicit schema change 
support.
   
   What the test does is write a partition in the old schema, followed by 
inserting a row with a new schema into another partition.
   
   **Steps to reproduce the behavior:**
   
   ```scala
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    *
   
    */
   package org.apache.hudi
   
   import org.apache.hudi.common.config.HoodieMetadataConfig
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.testutils.HoodieClientTestBase
   import org.apache.spark.sql.{DataFrame, SparkSession}
   import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
   
   class TestAvroSchemaResolutionSupportError extends HoodieClientTestBase {
   
     var spark: SparkSession = _
     val commonOpts: Map[String, String] = Map(
       HoodieWriteConfig.TBL_NAME.key -> 
"hoodie_avro_schema_resolution_support",
       "hoodie.insert.shuffle.parallelism" -> "1",
       "hoodie.upsert.shuffle.parallelism" -> "1",
       DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
       DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
       DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name",
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator",
       HoodieMetadataConfig.ENABLE.key -> "false"
     )
   
     /**
      * Setup method running before each test.
      */
     @BeforeEach override def setUp(): Unit = {
       setTableName("hoodie_avro_schema_resolution_support")
       initPath()
       initSparkContexts()
       spark = sqlContext.sparkSession
     }
   
     @AfterEach override def tearDown(): Unit = {
       cleanupSparkContexts()
     }
   
     def castColToX(x: Int, colToCast: String, df: DataFrame): DataFrame = x 
match {
       case 0 => df.withColumn(colToCast, df.col(colToCast).cast("long"))
       case 1 => df.withColumn(colToCast, df.col(colToCast).cast("float"))
       case 2 => df.withColumn(colToCast, df.col(colToCast).cast("double"))
       case 3 => df.withColumn(colToCast, df.col(colToCast).cast("binary"))
       case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string"))
     }
   
     def initialiseTable(df: DataFrame, saveDir: String): Unit = {
       df.write.format("hudi")
         .options(commonOpts)
         .mode("overwrite")
         .save(saveDir)
     }
   
     def upsertData(df: DataFrame, saveDir: String): Unit = {
       df.write.format("hudi")
         .options(commonOpts)
         .mode("append")
         .save(saveDir)
     }
   
     @Test def testDataTypePromotion(): Unit = {
       val _spark = spark
       import _spark.implicits._
   
       val colToCast = "userId"
       val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
       val df2 = Seq((2, 200L, "bbb")).toDF("id", "userid", "name")
       val tempRecordPath = basePath + "/record_tbl/"
   
       def doTest(colInitType: String, start: Int, end: Int): Unit = {
         for (a <- Range(start, end)) {
           try {
             Console.println(s"Performing test: $a with $colInitType")
   
             // convert int to string first before conversion to binary
             val initDF = if (colInitType == "binary") {
               val castDf1 = df1.withColumn(colToCast, 
df1.col(colToCast).cast("string"))
               castDf1.withColumn(colToCast, 
castDf1.col(colToCast).cast(colInitType))
             } else {
               df1.withColumn(colToCast, df1.col(colToCast).cast(colInitType))
             }
             initDF.printSchema()
             initDF.show(false)
   
             // recreate table
             initialiseTable(initDF, tempRecordPath)
   
             // perform avro supported casting
             var upsertDf = df2
             upsertDf = castColToX(a, colToCast, upsertDf)
             upsertDf.printSchema()
             upsertDf.show(false)
   
             // upsert
             upsertData(upsertDf, tempRecordPath)
   
             // read out the table
             val readDf = spark.read.format("hudi").load(tempRecordPath)
             readDf.printSchema()
             readDf.show(false)
             readDf.foreach(_ => {})
   
             assert(true)
           } catch {
             case e: Exception => {
               // e.printStackTrace()
               // Console.println(s"Test $a failed with error: ${e.getMessage}")
               assert(false, e)
             }
           }
         }
       }
   
       // INT -> [Long, Float, Double, String]
       doTest("int", 0, 3)
       // Long -> [Float, Double, String]
       doTest("long", 1, 3)
       // Float -> [Double, String]
       doTest("float", 2, 3)
       // String -> [Bytes]
       doTest("string", 3, 4)
       // Bytes -> [String]
       doTest("binary", 4, 5)
     }
   }
   
   ```
   
   1. Copy and paste the snippet into: 
/Users/voonhou.su/IdeaProjects/hudi_backup/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupportError.scala
   2. Switch profile to Spark3
   3. Run the function/test `testDataTypePromotion` as a test case
   
   **Expected behavior**
   
   Able to do a full table scan.
   
   **Environment Description**
   
   * Hudi version : 0.10, 0.11, 0.12, 0.13
   
   * Spark version : 3.x
   
   * Hive version : NIL
   
   * Hadoop version : NIL
   
   * Storage (HDFS/S3/GCS..) : NIL
   
   * Running on Docker? (yes/no) : NO
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```log
   java.lang.AssertionError: assertion failed: org.apache.spark.SparkException: 
Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 46.0 (TID 53) (1.2.3.4 executor driver): 
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot 
be converted in file 
file:///var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5722563086978229716/dataset/record_tbl/aaa/bec9f5b7-09e6-40c3-9c53-de8bbaa2d656-0_0-14-19_20221213200558758.parquet.
 Column: [userId], Expected: bigint, Found: INT32
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:339)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:571)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:294)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:181)
        at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        ... 20 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to