[
https://issues.apache.org/jira/browse/HUDI-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-5977:
-----------------------
Description:
When a Date -> String type conversion is performed and when the non-vectorized
reader is used, the table becomes unreadable.
Test casae to replicate this issue
{code:java}
test("Test DATE to STRING conversions when vectorized reading is not enabled") {
val tableName = generateTableName
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
|tblproperties (
| primaryKey = 'id'
)
""".stripMargin)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin)
spark.sql("set hoodie.schema.on.read.enable = true") // adding a struct
column to force reads to use non-vectorized readers
spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0:
INTEGER, f1: STRING>)")
spark.sql(
s"""
| insert into $tableName
| values (2, 'a2', 20, struct(2, 'f_2'), 1001)
""".stripMargin) spark.sql(s"alter table $tableName add column
(`date_to_string_col` date)")
spark.sql(
s"""
| insert into $tableName
| values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
""".stripMargin)
spark.sql(s"alter table $tableName alter column `date_to_string_col` type
string")
// struct and string (converted from date) column must be read to ensure that
non-vectorized reader is used
checkAnswer(s"select * from $tableName")(
Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
Seq("year=2021/month=02/day=01")
)
}{code}
Stacktrace
{code:java}
Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 27.0 (TID 32) (10.2.174.68 executor
driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at
org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:53)
at
org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:53)
at
org.apache.spark.sql.catalyst.expressions.CastBase.zoneId$lzycompute(Cast.scala:254)
at org.apache.spark.sql.catalyst.expressions.CastBase.zoneId(Cast.scala:254)
at
org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter$lzycompute(Cast.scala:297)
at
org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter(Cast.scala:297)
at
org.apache.spark.sql.catalyst.expressions.CastBase.castToStringCode(Cast.scala:1059)
at
org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:871)
at
org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:854)
at
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
at
org.apache.spark.sql.catalyst.expressions.CastBase.genCode(Cast.scala:848)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$1(CodeGenerator.scala:1187)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1187)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:331)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1278)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1275)
at
org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark31HoodieParquetFileFormat.scala:345)
at
org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
at
org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:603)
at
org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:665)
at
org.apache.hudi.BaseFileOnlyRelation.$anonfun$composeRDD$1(BaseFileOnlyRelation.scala:103)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750) {code}
was:
When a Date -> String type conversion is performed and when the non-vectorized
reader is used, the table becomes unreadable.
Test casae to replicate this issue
{code:java}
test("Test DATE to STRING conversions when vectorized reading is not enabled") {
val tableName = generateTableName
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
|tblproperties (
| primaryKey = 'id'
)
""".stripMargin)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin)
spark.sql("set hoodie.schema.on.read.enable = true") // adding a struct
column to force reads to use non-vectorized readers
spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0:
INTEGER, f1: STRING>)")
spark.sql(
s"""
| insert into $tableName
| values (2, 'a2', 20, struct(2, 'f_2'), 1001)
""".stripMargin) spark.sql(s"alter table $tableName add column
(`date_to_string_col` date)")
spark.sql(
s"""
| insert into $tableName
| values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
""".stripMargin)
spark.sql(s"alter table $tableName alter column `date_to_string_col` type
string")
// struct and string (converted from date) column must be read to ensure that
non-vectorized reader is used
checkAnswer(s"select * from $tableName")(
Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
Seq("year=2021/month=02/day=01")
)
}{code}
> Fix Date to String casts when non-vectorized readers are used
> -------------------------------------------------------------
>
> Key: HUDI-5977
> URL: https://issues.apache.org/jira/browse/HUDI-5977
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: voon
> Assignee: voon
> Priority: Major
> Labels: pull-request-available
>
> When a Date -> String type conversion is performed and when the
> non-vectorized reader is used, the table becomes unreadable.
>
> Test casae to replicate this issue
>
> {code:java}
> test("Test DATE to STRING conversions when vectorized reading is not
> enabled") {
> val tableName = generateTableName
> spark.sql(
> s"""
> | create table $tableName (
> | id int,
> | name string,
> | price double,
> | ts long
> |) using hudi
> | partitioned by (ts)
> |tblproperties (
> | primaryKey = 'id'
> )
> """.stripMargin)
> spark.sql(
> s"""
> | insert into $tableName
> | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
> """.stripMargin)
> spark.sql("set hoodie.schema.on.read.enable = true") // adding a struct
> column to force reads to use non-vectorized readers
> spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0:
> INTEGER, f1: STRING>)")
> spark.sql(
> s"""
> | insert into $tableName
> | values (2, 'a2', 20, struct(2, 'f_2'), 1001)
> """.stripMargin) spark.sql(s"alter table $tableName add column
> (`date_to_string_col` date)")
> spark.sql(
> s"""
> | insert into $tableName
> | values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
> """.stripMargin)
> spark.sql(s"alter table $tableName alter column `date_to_string_col` type
> string")
> // struct and string (converted from date) column must be read to ensure
> that non-vectorized reader is used
> checkAnswer(s"select * from $tableName")(
> Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
> Seq("year=2021/month=02/day=01")
> )
> }{code}
>
> Stacktrace
> {code:java}
> Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most
> recent failure: Lost task 0.0 in stage 27.0 (TID 32) (10.2.174.68 executor
> driver): java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:529)
> at scala.None$.get(Option.scala:527)
> at
> org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:53)
> at
> org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:53)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.zoneId$lzycompute(Cast.scala:254)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.zoneId(Cast.scala:254)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter$lzycompute(Cast.scala:297)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter(Cast.scala:297)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.castToStringCode(Cast.scala:1059)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:871)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:854)
> at
> org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
> at scala.Option.getOrElse(Option.scala:189)
> at
> org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
> at
> org.apache.spark.sql.catalyst.expressions.CastBase.genCode(Cast.scala:848)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$1(CodeGenerator.scala:1187)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.immutable.List.map(List.scala:298)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1187)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:331)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1278)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1275)
> at
> org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark31HoodieParquetFileFormat.scala:345)
> at
> org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
> at
> org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:603)
> at
> org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:665)
> at
> org.apache.hudi.BaseFileOnlyRelation.$anonfun$composeRDD$1(BaseFileOnlyRelation.scala:103)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)