[ 
https://issues.apache.org/jira/browse/HUDI-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

voon updated HUDI-5977:
-----------------------
    Description: 
When a Date -> String type conversion is performed and when the non-vectorized 
reader is used, the table becomes unreadable.

 

Test casae to replicate this issue

 
{code:java}
test("Test DATE to STRING conversions when vectorized reading is not enabled") {
  val tableName = generateTableName
  spark.sql(
    s"""
       | create table $tableName (
       |  id int,
       |  name string,
       |  price double,
       |  ts long
       |) using hudi
       | partitioned by (ts)
       |tblproperties (
       |  primaryKey = 'id'
       )
     """.stripMargin)
  spark.sql(
    s"""
       | insert into $tableName
       | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
      """.stripMargin)
  spark.sql("set hoodie.schema.on.read.enable = true")  // adding a struct 
column to force reads to use non-vectorized readers
  spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0: 
INTEGER, f1: STRING>)")
  spark.sql(
    s"""
       | insert into $tableName
       | values (2, 'a2', 20, struct(2, 'f_2'), 1001)
      """.stripMargin)  spark.sql(s"alter table $tableName add column 
(`date_to_string_col` date)")
  spark.sql(
    s"""
       | insert into $tableName
       | values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
      """.stripMargin)
  spark.sql(s"alter table $tableName alter column `date_to_string_col` type 
string")
  // struct and string (converted from date) column must be read to ensure that 
non-vectorized reader is used
  checkAnswer(s"select * from $tableName")(
    Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
    Seq("year=2021/month=02/day=01")
  )
}{code}
 

Stacktrace
{code:java}
Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 27.0 (TID 32) (10.2.174.68 executor 
driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at 
org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:53)
    at 
org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:53)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.zoneId$lzycompute(Cast.scala:254)
    at org.apache.spark.sql.catalyst.expressions.CastBase.zoneId(Cast.scala:254)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter$lzycompute(Cast.scala:297)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter(Cast.scala:297)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.castToStringCode(Cast.scala:1059)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:871)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:854)
    at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
    at scala.Option.getOrElse(Option.scala:189)
    at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
    at 
org.apache.spark.sql.catalyst.expressions.CastBase.genCode(Cast.scala:848)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$1(CodeGenerator.scala:1187)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1187)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:331)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1278)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1275)
    at 
org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark31HoodieParquetFileFormat.scala:345)
    at 
org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
    at 
org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:603)
    at 
org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:665)
    at 
org.apache.hudi.BaseFileOnlyRelation.$anonfun$composeRDD$1(BaseFileOnlyRelation.scala:103)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750) {code}

  was:
When a Date -> String type conversion is performed and when the non-vectorized 
reader is used, the table becomes unreadable.

 

Test casae to replicate this issue

 
{code:java}
test("Test DATE to STRING conversions when vectorized reading is not enabled") {
  val tableName = generateTableName
  spark.sql(
    s"""
       | create table $tableName (
       |  id int,
       |  name string,
       |  price double,
       |  ts long
       |) using hudi
       | partitioned by (ts)
       |tblproperties (
       |  primaryKey = 'id'
       )
     """.stripMargin)
  spark.sql(
    s"""
       | insert into $tableName
       | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
      """.stripMargin)
  spark.sql("set hoodie.schema.on.read.enable = true")  // adding a struct 
column to force reads to use non-vectorized readers
  spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0: 
INTEGER, f1: STRING>)")
  spark.sql(
    s"""
       | insert into $tableName
       | values (2, 'a2', 20, struct(2, 'f_2'), 1001)
      """.stripMargin)  spark.sql(s"alter table $tableName add column 
(`date_to_string_col` date)")
  spark.sql(
    s"""
       | insert into $tableName
       | values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
      """.stripMargin)
  spark.sql(s"alter table $tableName alter column `date_to_string_col` type 
string")
  // struct and string (converted from date) column must be read to ensure that 
non-vectorized reader is used
  checkAnswer(s"select * from $tableName")(
    Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
    Seq("year=2021/month=02/day=01")
  )
}{code}
 

 


> Fix Date to String casts when non-vectorized readers are used
> -------------------------------------------------------------
>
>                 Key: HUDI-5977
>                 URL: https://issues.apache.org/jira/browse/HUDI-5977
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>
> When a Date -> String type conversion is performed and when the 
> non-vectorized reader is used, the table becomes unreadable.
>  
> Test casae to replicate this issue
>  
> {code:java}
> test("Test DATE to STRING conversions when vectorized reading is not 
> enabled") {
>   val tableName = generateTableName
>   spark.sql(
>     s"""
>        | create table $tableName (
>        |  id int,
>        |  name string,
>        |  price double,
>        |  ts long
>        |) using hudi
>        | partitioned by (ts)
>        |tblproperties (
>        |  primaryKey = 'id'
>        )
>      """.stripMargin)
>   spark.sql(
>     s"""
>        | insert into $tableName
>        | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
>       """.stripMargin)
>   spark.sql("set hoodie.schema.on.read.enable = true")  // adding a struct 
> column to force reads to use non-vectorized readers
>   spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0: 
> INTEGER, f1: STRING>)")
>   spark.sql(
>     s"""
>        | insert into $tableName
>        | values (2, 'a2', 20, struct(2, 'f_2'), 1001)
>       """.stripMargin)  spark.sql(s"alter table $tableName add column 
> (`date_to_string_col` date)")
>   spark.sql(
>     s"""
>        | insert into $tableName
>        | values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
>       """.stripMargin)
>   spark.sql(s"alter table $tableName alter column `date_to_string_col` type 
> string")
>   // struct and string (converted from date) column must be read to ensure 
> that non-vectorized reader is used
>   checkAnswer(s"select * from $tableName")(
>     Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
>     Seq("year=2021/month=02/day=01")
>   )
> }{code}
>  
> Stacktrace
> {code:java}
> Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most 
> recent failure: Lost task 0.0 in stage 27.0 (TID 32) (10.2.174.68 executor 
> driver): java.util.NoSuchElementException: None.get
>     at scala.None$.get(Option.scala:529)
>     at scala.None$.get(Option.scala:527)
>     at 
> org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:53)
>     at 
> org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:53)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.zoneId$lzycompute(Cast.scala:254)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.zoneId(Cast.scala:254)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter$lzycompute(Cast.scala:297)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter(Cast.scala:297)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.castToStringCode(Cast.scala:1059)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:871)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:854)
>     at 
> org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
>     at scala.Option.getOrElse(Option.scala:189)
>     at 
> org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
>     at 
> org.apache.spark.sql.catalyst.expressions.CastBase.genCode(Cast.scala:848)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$1(CodeGenerator.scala:1187)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>     at scala.collection.immutable.List.foreach(List.scala:392)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>     at scala.collection.immutable.List.map(List.scala:298)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1187)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:331)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1278)
>     at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1275)
>     at 
> org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark31HoodieParquetFileFormat.scala:345)
>     at 
> org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
>     at 
> org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:603)
>     at 
> org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:665)
>     at 
> org.apache.hudi.BaseFileOnlyRelation.$anonfun$composeRDD$1(BaseFileOnlyRelation.scala:103)
>     at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
>     at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
>     at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:131)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to