voonhous opened a new issue, #7444: URL: https://github.com/apache/hudi/issues/7444
- Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? (Yes) - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. Implicit schema changes that do not write to the `.schema` folder will cause read issues on Spark's end. The current implementation of Schema Evolution is as such: If the schema change is supported by the Avro's Schema resolution, `ALTER TABLE DDL` is not required. The column type changes that are supported by Avro's Schema resolution is as such:  **Caveat:** The current implementation is sufficient provided that ALL data is re-written with the new schema. However, if there are certain filegroups/partition that are still in the old schema when being read out, errors will be thrown. As such, the current support for implicit column changes is still a little buggy when it comes to column changes. To reproduce the issue, one can use this script below to test the schema evolution that is "allegedly" supported by Hudi's implicit schema change support. What the test does is write a partition in the old schema, followed by inserting a row with a new schema into another partition. **Steps to reproduce the behavior:** ```scala /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package org.apache.hudi import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.{DataFrame, SparkSession} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} class TestAvroSchemaResolutionSupportError extends HoodieClientTestBase { var spark: SparkSession = _ val commonOpts: Map[String, String] = Map( HoodieWriteConfig.TBL_NAME.key -> "hoodie_avro_schema_resolution_support", "hoodie.insert.shuffle.parallelism" -> "1", "hoodie.upsert.shuffle.parallelism" -> "1", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator", HoodieMetadataConfig.ENABLE.key -> "false" ) /** * Setup method running before each test. */ @BeforeEach override def setUp(): Unit = { setTableName("hoodie_avro_schema_resolution_support") initPath() initSparkContexts() spark = sqlContext.sparkSession } @AfterEach override def tearDown(): Unit = { cleanupSparkContexts() } def castColToX(x: Int, colToCast: String, df: DataFrame): DataFrame = x match { case 0 => df.withColumn(colToCast, df.col(colToCast).cast("long")) case 1 => df.withColumn(colToCast, df.col(colToCast).cast("float")) case 2 => df.withColumn(colToCast, df.col(colToCast).cast("double")) case 3 => df.withColumn(colToCast, df.col(colToCast).cast("binary")) case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string")) } def initialiseTable(df: DataFrame, saveDir: String): Unit = { df.write.format("hudi") .options(commonOpts) .mode("overwrite") .save(saveDir) } def upsertData(df: DataFrame, saveDir: String): Unit = { df.write.format("hudi") .options(commonOpts) .mode("append") .save(saveDir) } @Test def testDataTypePromotion(): Unit = { val _spark = spark import _spark.implicits._ val colToCast = "userId" val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name") val df2 = Seq((2, 200L, "bbb")).toDF("id", "userid", "name") val tempRecordPath = basePath + "/record_tbl/" def doTest(colInitType: String, start: Int, end: Int): Unit = { for (a <- Range(start, end)) { try { Console.println(s"Performing test: $a with $colInitType") // convert int to string first before conversion to binary val initDF = if (colInitType == "binary") { val castDf1 = df1.withColumn(colToCast, df1.col(colToCast).cast("string")) castDf1.withColumn(colToCast, castDf1.col(colToCast).cast(colInitType)) } else { df1.withColumn(colToCast, df1.col(colToCast).cast(colInitType)) } initDF.printSchema() initDF.show(false) // recreate table initialiseTable(initDF, tempRecordPath) // perform avro supported casting var upsertDf = df2 upsertDf = castColToX(a, colToCast, upsertDf) upsertDf.printSchema() upsertDf.show(false) // upsert upsertData(upsertDf, tempRecordPath) // read out the table val readDf = spark.read.format("hudi").load(tempRecordPath) readDf.printSchema() readDf.show(false) readDf.foreach(_ => {}) assert(true) } catch { case e: Exception => { // e.printStackTrace() // Console.println(s"Test $a failed with error: ${e.getMessage}") assert(false, e) } } } } // INT -> [Long, Float, Double, String] doTest("int", 0, 3) // Long -> [Float, Double, String] doTest("long", 1, 3) // Float -> [Double, String] doTest("float", 2, 3) // String -> [Bytes] doTest("string", 3, 4) // Bytes -> [String] doTest("binary", 4, 5) } } ``` 1. Copy and paste the snippet into: /Users/voonhou.su/IdeaProjects/hudi_backup/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupportError.scala 2. Switch profile to Spark3 3. Run the function/test `testDataTypePromotion` as a test case **Expected behavior** Able to do a full table scan. **Environment Description** * Hudi version : 0.10, 0.11, 0.12, 0.13 * Spark version : 3.x * Hive version : NIL * Hadoop version : NIL * Storage (HDFS/S3/GCS..) : NIL * Running on Docker? (yes/no) : NO **Additional context** Add any other context about the problem here. **Stacktrace** ```log java.lang.AssertionError: assertion failed: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 53) (1.2.3.4 executor driver): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5722563086978229716/dataset/record_tbl/aaa/bec9f5b7-09e6-40c3-9c53-de8bbaa2d656-0_0-14-19_20221213200558758.parquet. Column: [userId], Expected: bigint, Found: INT32 at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:339) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:571) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:294) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:181) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) ... 20 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
