rmehlitz opened a new issue, #5715:
URL: https://github.com/apache/hudi/issues/5715

   Hello Hudi-community,
   
   we have struggled to get the Hudi time travel working with version 0.11.0. 
We are always getting the newest snapshot and not the state of the time we 
request. This was working with Hudi 0.10.1.
   
   We process the data in AWS via Glue or EMR and write the table into s3.
   We used the hudi-spark3.1-bundle_2.12-0.11.0.jar and set the following spark 
session configs:
   
   
   ```scala
   implicit val spark: SparkSession = SparkSession.builder
        .master(config.master)
        .appName("Hudi-Preprocessor")
        .config("spark.app.id", "Hudi-Preprocessor")
        .config("spark.sql.parquet.mergeSchema", value = true)
        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.hive.convertMetastoreParquet", "false")
        .config("spark.sql.sources.partitionColumnTypeInference.enabled", value 
= false)
        .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
        .enableHiveSupport()
        .getOrCreate()
   ```
   
   For writing data we used the following Hudi options:
   
   ```scala
   val hudiOptions = Map[String, String](
     HoodieWriteConfig.TBL_NAME.key() -> "my_table",
     DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE",
     DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "id",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "creation_date",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "last_update_time",
     HiveSyncConfig.HIVE_SYNC_ENABLED.key() -> "true",
     DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true",
     HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key() -> "creation_date",
     HoodieSyncConfig.META_SYNC_DATABASE_NAME.key() -> "default",
     HoodieSyncConfig.META_SYNC_TABLE_NAME.key() -> "my_table",
     HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key() -> 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
     HiveSyncConfig.HIVE_SYNC_MODE.key() -> "hms",
     HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key() -> "true",
     HoodieIndexConfig.INDEX_TYPE.key() -> "GLOBAL_SIMPLE",
     HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> "true",
     DataSourceWriteOptions.RECONCILE_SCHEMA.key() -> "true"
   )
   ```
   
   Having the following data set, we wrote an initial table and upserted 
records. After the upsert the time travel query shows the current 
state/snapshot and we never get the snapshot before the upsert.
   
   ```scala
   //initial data frame
   val initialInputDf = Seq(
    (false, "100", "2015-01-01", "2015-01-01T13:51:39.340396Z", 1, "much 
value"),
    (false, "101", "2015-01-01", "2015-01-01T12:14:58.597216Z", 5, "not so much 
value")
    ).toDF("_hoodie_is_deleted", "id", "creation_date", "last_update_time", 
"version", "value")
   
   //writing the hudi table the first time
    initialInputDf.write
        .format("org.apache.hudi")
        .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .options(hudiOptions)
        .mode(SaveMode.Append)
        .save("s3://<path_to_table>/my_table/")
   
   //upsert table
   val update_data = Seq(
    (false, "100", "2015-01-03", "2015-01-03T13:51:39.340396Z", 2, "another 
value"),
    (false, "101", "2015-01-03", "2015-01-03T12:14:58.597216Z", 6, "3rd value")
    ).toDF("_hoodie_is_deleted", "id", "creation_date", "last_update_time", 
"version", "value")
   
    update_data.write
        .format("org.apache.hudi")
        .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .options(hudiOptions)
        .mode(SaveMode.Append)
        .save("s3://<path_to_table>/my_table/")
   ```
   
   We did the time travel query with a time between the initial table commit 
and the upsert.
   
   ```scala
   spark.read
     .format("org.apache.hudi")
     .option("as.of.instant", "<time between initial and upsert commit>")
     .table("default.my_table")
     .show(false)
   ```
   
   What we expect here, is the snapshot which includes only the initial data. 
But we only get the current snapshot - meaning the updated data.
   With Hudi 0.10.1 it worked as expected, but after the update to 0.11.0, we 
experience this new behavior which is not explainable to us.
   Maybe we miss some configurations.
   
   Can you please have a look into this? It holds us back to use Hudi in our 
company at all.
   Let us know if you need more information.
   
   Thank you for your help!
   
   
   
   **Expected behavior**
   
   The time travel query returns the requested state of the given time.
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : We tried 3.2.0 (EMR) and 3.1.1 (Glue)
   
   * Hive version : 3.1.2
    
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   **Additional context**
   We are using the glue meta store in AWS. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to