luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503105055

   @ad1happy2go 
   if setting spark.sql.codegen.wholeStage= "false", data must lose  while read 
MOR table(at least one log file) .
   reproduceble code (Java):
   1、run application, create sparkSession with setting - 
spark.sql.codegen.wholeStage= "false".
   ```
           SparkConf sparkConf = new SparkConf(false).setMaster("local[*]");
           SparkSession spark = SparkSession.builder()
                   .config(sparkConf)
                   .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                   .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                   .config("spark.sql.codegen.maxFields", "1000")
                   .config("spark.sql.codegen.wholeStage", "false")
                   .getOrCreate();
   ```
   2、 Create tableA , And insert 5 records
   ```
           Dataset<Row> dataset = spark.sql("select 1 id, 'mock' field1, 'mock' 
field2"
                   + "\nunion all select 2 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 3 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 4 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 5 id, 'mock' field1, 'mock' field2");
           String tableAPath = "file:/D:/test";
           dataset = dataset.withColumn("lake_update_date", 
functions.current_timestamp());
           dataset.write()
                   .format("org.apache.hudi")
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"id")
                   
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, 
"lake_update_date")
                   .option(HoodieWriteConfig.TABLE_NAME, "tableA")
                   .mode(SaveMode.Append)
                   .save(tableAPath);
           System.out.println(dataset.count());
   ```
   print -> 5 records
   3、update 2 records
   ```
           Dataset<Row> dataset2 = spark.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),       
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           dataset2 = dataset2.filter("id = 1 or id = 2").withColumn("field2", 
functions.lit("mock2"));
           dataset2.write()
                   .format("org.apache.hudi")
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"id")
                   
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, 
"lake_update_date")
                   .option(HoodieWriteConfig.TABLE_NAME, "tableA")
                   .option("hoodie.archival.rollback.instants.hours.retained", 
"10")
                   .mode(SaveMode.Append)
                   .save(tableAPath);
   ```
   4、read tableA agian, And show the table count
   ```
   Dataset<Row> dataset3 = spark.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),       
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           System.out.println(dataset3.count());
   print -> 4 records (1 record missing)
   ```
   5、  rerun application with a new sparkSeesion and read tableA again which 
don't have setting - spark.sql.codegen.wholeStage= "false"
   ```
           SparkSession spark = SparkSession.builder()
                   .config(sparkConf)
                   .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                   .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                   .config("spark.sql.codegen.maxFields", "1000")
                   .getOrCreate();
             Dataset<Row> dataset = spark.read().format("org.apache.hudi")
                     .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),       
             DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                     .load(tableAPath);
             System.out.println(dataset.count());
   ```
   print -> 5 records (no record missing)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to