[ 
https://issues.apache.org/jira/browse/HUDI-9791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-9791:
----------------------------------
    Description: 
Exception: [^hfile.error]

Summary:
 # Write an insert and update to mor table with spark 3.1 hudi 0.14.1
 # write an update with spark 3.5 with current master 
`adda6950e0aaa7353add88ee2fc0499d7135ee33` using write table version 6

 # Read table with spark 3.1 hudi 0.14.1  and get exception

The hoodie.properties still says table version is 6

Here is my runscript:
{code:java}
set_spark 3.1
hudi_spark_shell -p -v 0.14.1

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
    
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
    
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
    ),
    
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  option("hoodie.metadata.index.column.stats.enable", "true").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  mode(Overwrite).
  save(basePath)
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)

updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  option("hoodie.metadata.index.column.stats.enable", "true").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  mode(Append).
  save(basePath)
//exit
set_spark 3.5
hudi_spark_shell -j

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)

updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  option("hoodie.metadata.index.column.stats.enable", "true").
  option("hoodie.write.table.version", "6").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  mode(Append).
  save(basePath)

//exit
set_spark 3.1
hudi_spark_shell -p -v 0.14.1

spark.read.format("hudi").option("hoodie.metadata.enable", 
"true").option("hoodie.enable.data.skipping", 
"true").option("hoodie.metadata.index.column.stats.enable", 
"true").load("/tmp/trips_table").filter("fare > 100").show(100,false) {code}
Command for running 0.14.1 with spark 3.1 using mvn package:
{code:java}
/Users/jon/Documents/sparkroot/spark-3.1.3-bin-hadoop3.2/bin/spark-shell 
--packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.14.1 --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'  --conf 
'spark.sql.catalogImplementation=in-memory' {code}
Command for running with current master on spark 3.5
{code:java}
/Users/jon/Documents/sparkroot/spark-3.5.2-bin-hadoop3/bin/spark-shell --jars 
/Users/jon/git/hudi-versions/current/spark3.5/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
 --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  --conf 'spark.sql.catalogImplementation=in-memory' {code}
 

 

  was:
Exception: [^hfile.error]

Summary:
 # Write an insert and update to mor table with spark 3.1 hudi 0.14.1
 # write an update with spark 3.5 with current master 
`adda6950e0aaa7353add88ee2fc0499d7135ee33` using write table version 6

 # Read table with spark 3.1 hudi 0.14.1  and get exception

The hoodie.properties still says table version is 6


Here is my runscript:
{code:java}
set_spark 3.1hudi_spark_shell -p -v 0.14.1
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
    
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
    
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
    ),
    
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));var
 inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  option("hoodie.metadata.index.column.stats.enable", "true").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  mode(Overwrite).
  save(basePath)
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  option("hoodie.metadata.index.column.stats.enable", "true").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  mode(Append).
  save(basePath)//exitset_spark 3.5
hudi_spark_shell -j
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  option("hoodie.metadata.index.column.stats.enable", "true").
  option("hoodie.write.table.version", "6").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  mode(Append).
  save(basePath)//exit
set_spark 3.1hudi_spark_shell -p -v 
0.14.1spark.read.format("hudi").option("hoodie.metadata.enable", 
"true").option("hoodie.enable.data.skipping", 
"true").option("hoodie.metadata.index.column.stats.enable", 
"true").load("/tmp/trips_table").filter("fare > 100").show(100,false) {code}


Command for running 0.14.1 with spark 3.1 using mvn package:
{code:java}
/Users/jon/Documents/sparkroot/spark-3.1.3-bin-hadoop3.2/bin/spark-shell 
--packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.14.1 --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'  --conf 
'spark.sql.catalogImplementation=in-memory' {code}

Command for running with current master on spark 3.5


{code:java}
/Users/jon/Documents/sparkroot/spark-3.5.2-bin-hadoop3/bin/spark-shell --jars 
/Users/jon/git/hudi-versions/current/spark3.5/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
 --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  --conf 'spark.sql.catalogImplementation=in-memory' {code}
 

 


> MDT breaks with hfile reader changes when writing with master using table 
> version 6
> -----------------------------------------------------------------------------------
>
>                 Key: HUDI-9791
>                 URL: https://issues.apache.org/jira/browse/HUDI-9791
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: metadata
>            Reporter: Jonathan Vexler
>            Assignee: Lin Liu
>            Priority: Blocker
>             Fix For: 1.1.0
>
>         Attachments: hfile.error
>
>
> Exception: [^hfile.error]
> Summary:
>  # Write an insert and update to mor table with spark 3.1 hudi 0.14.1
>  # write an update with spark 3.5 with current master 
> `adda6950e0aaa7353add88ee2fc0499d7135ee33` using write table version 6
>  # Read table with spark 3.1 hudi 0.14.1  and get exception
> The hoodie.properties still says table version is 6
> Here is my runscript:
> {code:java}
> set_spark 3.1
> hudi_spark_shell -p -v 0.14.1
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.common.table.HoodieTableConfig._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
> import org.apache.hudi.common.model.HoodieRecord
> import spark.implicits._val tableName = "trips_table"
> val basePath = "file:///tmp/trips_table"
> val columns = Seq("ts","uuid","rider","driver","fare","city")
> val data =
>   
> Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
>     
> (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
>  ,"san_francisco"),
>     
> (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
>  ,"san_francisco"),
>     
> (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
>     ),
>     
> (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
> var inserts = spark.createDataFrame(data).toDF(columns:_*)
> inserts.write.format("hudi").
>   option("hoodie.datasource.write.partitionpath.field", "city").
>   option("hoodie.table.name", tableName).
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   mode(Overwrite).
>   save(basePath)
> val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
> "rider-D").withColumn("fare", col("fare") * 10)
> updatesDf.write.format("hudi").
>   option("hoodie.datasource.write.operation", "upsert").
>   option("hoodie.datasource.write.partitionpath.field", "city").
>   option("hoodie.table.name", tableName).
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   mode(Append).
>   save(basePath)
> //exit
> set_spark 3.5
> hudi_spark_shell -j
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.common.table.HoodieTableConfig._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
> import org.apache.hudi.common.model.HoodieRecord
> import spark.implicits._val tableName = "trips_table"
> val basePath = "file:///tmp/trips_table"
> val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
> "rider-D").withColumn("fare", col("fare") * 10)
> updatesDf.write.format("hudi").
>   option("hoodie.datasource.write.operation", "upsert").
>   option("hoodie.datasource.write.partitionpath.field", "city").
>   option("hoodie.table.name", tableName).
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option("hoodie.write.table.version", "6").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   mode(Append).
>   save(basePath)
> //exit
> set_spark 3.1
> hudi_spark_shell -p -v 0.14.1
> spark.read.format("hudi").option("hoodie.metadata.enable", 
> "true").option("hoodie.enable.data.skipping", 
> "true").option("hoodie.metadata.index.column.stats.enable", 
> "true").load("/tmp/trips_table").filter("fare > 100").show(100,false) {code}
> Command for running 0.14.1 with spark 3.1 using mvn package:
> {code:java}
> /Users/jon/Documents/sparkroot/spark-3.1.3-bin-hadoop3.2/bin/spark-shell 
> --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.14.1 --conf 
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
> 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'  --conf 
> 'spark.sql.catalogImplementation=in-memory' {code}
> Command for running with current master on spark 3.5
> {code:java}
> /Users/jon/Documents/sparkroot/spark-3.5.2-bin-hadoop3/bin/spark-shell --jars 
> /Users/jon/git/hudi-versions/current/spark3.5/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
>  --conf 
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 
> 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' --conf 
> 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
>   --conf 'spark.sql.catalogImplementation=in-memory' {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to