luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503572355
You can reproduce this issue by below code,
```
package test;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import java.io.IOException;
import java.nio.file.Files;
public class Test {
public static void main(String [] args) throws IOException {
//1、create sparkSession, set "spark.sql.codegen.wholeStage" to
"false".
SparkConf sparkConf = new SparkConf(false).setMaster("local[*]");
SparkSession spark = SparkSession.builder()
.config(sparkConf)
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.sql.codegen.maxFields", "1000")
.config("spark.sql.codegen.wholeStage", "false")
.config("spark.driver.host", "localhost")
.getOrCreate();
//2、 Create tableA , And insert 5 records, then 1 base file generate
Dataset<Row> dataset = spark.sql("select 1 id, 'mock' field1, 'mock'
field2"
+ "\nunion all select 2 id, 'mock' field1, 'mock' field2"
+ "\nunion all select 3 id, 'mock' field1, 'mock' field2"
+ "\nunion all select 4 id, 'mock' field1, 'mock' field2"
+ "\nunion all select 5 id, 'mock' field1, 'mock' field2");
String tableAPath = Files.createTempDirectory("").toString();
dataset = dataset.withColumn("lake_update_date",
functions.current_timestamp());
dataset.write()
.format("org.apache.hudi")
.option(DataSourceWriteOptions.TABLE_TYPE().key(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(),
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
.option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP,
"lake_update_date")
.option(HoodieWriteConfig.TABLE_NAME, "tableA")
.mode(SaveMode.Append)
.save(tableAPath);
System.out.println(dataset.count());//should be 5
//3、update 2 records in tableA, then 1 log file generate
Dataset<Row> dataset2 = spark.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(tableAPath);
dataset2 = dataset2.filter("id = 1 or id = 2").withColumn("field2",
functions.lit("mock2").cast(DataTypes.StringType))
.withColumn("lake_update_date",
functions.current_timestamp());
dataset2.write()
.format("org.apache.hudi")
.option(DataSourceWriteOptions.TABLE_TYPE().key(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(),
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
.option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP,
"lake_update_date")
.option(HoodieWriteConfig.TABLE_NAME, "tableA")
.mode(SaveMode.Append)
.save(tableAPath);
//4、read tableA again, And show the table count
Dataset<Row> dataset3 = spark.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(tableAPath);
System.out.println(dataset3.count());//should be 4, 1 record was lost
//5、 create a new sparkSession without setting
"spark.sql.codegen.wholeStage", and read tableA again
SparkSession newSessionWithoutCodegenDisabled = spark.cloneSession();
newSessionWithoutCodegenDisabled.conf().unset("spark.sql.codegen.wholeStage");
Dataset<Row> newDataset =
newSessionWithoutCodegenDisabled.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(tableAPath);
System.out.println(newDataset.count());//should be 5, no missing
}
}
```
You can check the comments in the code, and you can see when whole stage
codegen is disabled, 1 record would be lost after writing into the MOR table
and reading from it.
And once whole stage codegen disable option is unset, the result is expected.
We spent hours debugging this issue and found in
[HoodieSparkUtils](https://github.com/apache/hudi/blob/release-0.12.1/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala)
line 102, it calls "rows.isEmpty" method, if whole stage codegen is disabled
or number of fields in the dataset is more than the value of
"spark.sql.codegen.maxFields" option.
Then the code will call
"[HoodieMergeOnReadRDD](https://github.com/apache/hudi/blob/release-0.12.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala).RecordMergingFileIterator.hasNext"
method, which is equals to "hasNextInternal" method,
and in "hasNextInternal" method, it calls "baseFileIterator.next()" method,
we think this method should be called in "next()" method based on the normal
use of iterator.
please help check and clarify if our suspicion is correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]