voonhous commented on issue #17744:
URL: https://github.com/apache/hudi/issues/17744#issuecomment-3822740214
Writing with shredding flag enabled, then reading with shredded flag
disabled will cause the following errors:
```scala
test("Write with Shredded Variant but read with shredded flag disabled") {
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0
or higher")
// Shredding enabled with forced schema = parquet should have typed_value
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Enable Spark's shredding configs to see if they affect Hudi
spark.conf.set("spark.sql.variant.allowReadingShredded", "true")
// Set configurations equivalent to the Scala snippet
spark.conf.set("spark.sql.variant.writeShredding.enabled", "true")
// Force a specific shredding schema for testing
spark.conf.set("spark.sql.variant.forceShreddingSchemaForTest", "age
int")
spark.sql(
s"""
|create table $tableName (
| id int,
| v variant,
| ts long
|)
| location '${tmp.getCanonicalPath}'
| tblproperties (primaryKey='id', type='cow',
preCombineField='ts')
""".stripMargin)
spark.sql(s"""
|insert into $tableName values
| (1, parse_json('{"name": "Alice", "age": 30}'), 1000),
| (2, parse_json('{"name": "Bob", "age": 25}'), 1000)
""".stripMargin)
spark.sql(s"select * from $tableName").show(false)
spark.conf.set("spark.sql.variant.allowReadingShredded", "false")
spark.sql(s"select * from $tableName").show(false)
})
}
```
Error:
```
[FAILED_READ_FILE.NO_HINT] Encountered error while reading file
file:///private/var/folders/vh/zgs02hf51dn7r08pbl5m2jc00000gn/T/spark-86e380b5-d7ff-4080-adb2-197c0c46ad2f/part-00000-119c0db7-fc36-4d8e-8c0e-c1c7fb7f56da-c000.snappy.parquet.
SQLSTATE: KD001
org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered
error while reading file
file:///private/var/folders/vh/zgs02hf51dn7r08pbl5m2jc00000gn/T/spark-86e380b5-d7ff-4080-adb2-197c0c46ad2f/part-00000-119c0db7-fc36-4d8e-8c0e-c1c7fb7f56da-c000.snappy.parquet.
SQLSTATE: KD001
at
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:856)
at
org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:142)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
at
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
at org.apache.spark.scheduler.Task.run(Task.scala:147)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:544)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:497)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:58)
at
org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244)
at
org.apache.spark.sql.classic.Dataset.$anonfun$head$1(Dataset.scala:1379)
at
org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
at
org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
at org.apache.spark.sql.classic.Dataset.getRows(Dataset.scala:339)
at org.apache.spark.sql.classic.Dataset.showString(Dataset.scala:375)
at org.apache.spark.sql.classic.Dataset.show(Dataset.scala:609)
at org.apache.spark.sql.Dataset.show(Dataset.scala:479)
at
org.apache.spark.sql.hudi.dml.schema.TestVariantDataType.$anonfun$new$17(TestVariantDataType.scala:367)
at
org.apache.spark.sql.hudi.dml.schema.TestVariantDataType.$anonfun$new$17$adapted(TestVariantDataType.scala:335)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withTempDir(HoodieSparkSqlTestBase.scala:102)
at
org.apache.spark.sql.hudi.dml.schema.TestVariantDataType.$anonfun$new$16(TestVariantDataType.scala:335)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$withRecordType$3(HoodieSparkSqlTestBase.scala:375)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withSQLConf(HoodieSparkSqlTestBase.scala:326)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$withRecordType$1(HoodieSparkSqlTestBase.scala:374)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$withRecordType$1$adapted(HoodieSparkSqlTestBase.scala:366)
at scala.collection.immutable.List.foreach(List.scala:334)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.withRecordType(HoodieSparkSqlTestBase.scala:366)
at
org.apache.spark.sql.hudi.dml.schema.TestVariantDataType.$anonfun$new$15(TestVariantDataType.scala:335)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.$anonfun$test$1(HoodieSparkSqlTestBase.scala:114)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at
org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:334)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
at org.scalatest.Suite.run(Suite.scala:1112)
at org.scalatest.Suite.run$(Suite.scala:1094)
at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.org$scalatest$BeforeAndAfterAll$$super$run(HoodieSparkSqlTestBase.scala:58)
at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.run(HoodieSparkSqlTestBase.scala:58)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
at
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:334)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
at
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
at
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Caused by: org.apache.spark.sql.AnalysisException:
[INVALID_VARIANT_FROM_PARQUET.WRONG_NUM_FIELDS] Invalid variant. Variant column
must contain exactly two fields. SQLSTATE: 22023
at
org.apache.spark.sql.errors.QueryCompilationErrors$.invalidVariantWrongNumFieldsError(QueryCompilationErrors.scala:2032)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertVariantField(ParquetSchemaConverter.scala:438)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:209)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$4(ParquetSchemaConverter.scala:163)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$4$adapted(ParquetSchemaConverter.scala:127)
at scala.collection.immutable.Range.map(Range.scala:59)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertInternal(ParquetSchemaConverter.scala:127)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertParquetColumn(ParquetSchemaConverter.scala:103)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:131)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:195)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:292)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:230)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:289)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext0(FileScanRDD.scala:131)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:140)
at
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
at org.apache.spark.scheduler.Task.run(Task.scala:147)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]