hughfdjackson opened a new issue #1979: URL: https://github.com/apache/hudi/issues/1979
**Describe the problem you faced** My team are interested in writing to Hudi tables using a repeated batch process that often upserts data that's identical to what's already there. For instance, we may be: - recalculating # of times a particular set of event has occurred - re-running a query over the last week of data, to include potentially late arriving data. We also have some consumers that want to consume these tables incrementally (to ingest the latest results into local databases, or monitor the changes). Ideally, these consumers would only see the 1% of records that have changed, rather than all records involved in the upsert. However, in our testing, it seems like the incremental query returns _all_ records that were involved in the upsert, even if they were overwriting identical data. (As far as I can tell, this happens here: https://github.com/apache/hudi/blob/release-0.5.3/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L238-L244, no matter which `PAYLOAD_CLASS_OPT_KEY` class is used). **To Reproduce** Steps to reproduce the behavior: 1. clone hudi git repo, checkout `release-0.5.3-rc2` and run `mvn clean package -DskipTests -DskipITs` 2. Copy `packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.5.3-rc2.jar` to EMR master node 3. Run the following spark shell on master, with the command: `spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars hudi-spark-bundle_2.11-0.5.3-rc2.jar,/usr/lib/spark/external/lib/spark-avro.jar -i spark-shell-script` where `spark-shell-script` contents is: ```scala import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.DataFrame import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.table.HoodieTable import org.apache.hudi.config.HoodieWriteConfig // Helper functions val basePath = "s3://{s3BucketNameAndPrefixPath}" val tableName = "hudi_incremental_read_test" def write(df: DataFrame, saveMode: SaveMode = Append) = df.write.format("hudi") .option(PRECOMBINE_FIELD_OPT_KEY, "ts") .option(RECORDKEY_FIELD_OPT_KEY, "uuid") .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath") .option("hoodie.consistency.check.enabled", "true") .option(TABLE_NAME, tableName) .mode(saveMode) .save(basePath) def incrementalRead(beginInstant: String) = { println(s"READING FROM $beginInstant") spark.read .format("hudi") .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(BEGIN_INSTANTTIME_OPT_KEY, beginInstant) .load(basePath) } def latestCommitInstant() = { val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) val hoodieTable = HoodieTable.getHoodieTable(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), spark.sparkContext) hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants().lastInstant.get.getTimestamp } def justBefore(commitTime: String) = (commitTime.toLong - 1).toString val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) write(df, saveMode=Overwrite) println(""" ----------- INCREMENTAL READ ------- """) println("The whole table is new, so I'm expecting all 10 rows to be returned on incremental read") incrementalRead(justBefore(latestCommitInstant)).show() // generate an update for a single row val updates = convertToStringList(dataGen.generateUpdates(1)) val updatesDF = spark.read.json(spark.sparkContext.parallelize(updates, 2)) println(""" ----------- INCREMENTAL READ ------- """) println("Now we're updating a row, we expect to see the updated row only on incremental read, which we do") write(updatesDF) incrementalRead(justBefore(latestCommitInstant)).show() println(""" ----------- INCREMENTAL READ ------- """) println("Re-upserting the same row twice causes it to be 'emitted' twice to the incremental reader, even though the contents of the second reading are identical from the first (metadata aside)") write(updatesDF) incrementalRead(justBefore(latestCommitInstant)).show() ``` That results in: ``` ----------- INCREMENTAL READ ------- The whole table is new, so I'm expecting all 10 rows to be returned on incremental read READING FROM 20200818091617 +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ |_hoodie_commit_time|_hoodie_commit_seqno| _hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid| +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ | 20200818091618| 20200818091618_1_1|ecde6618-0cbc-4b6...| americas/united_s...|3e9b3e64-3895-46a...|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|americas/united_s...|rider-213|0.0|ecde6618-0cbc-4b6...| | 20200818091618| 20200818091618_1_2|c9a45eda-fe53-480...| americas/united_s...|3e9b3e64-3895-46a...| 0.8742041526408587| 0.7528268153249502|driver-213| 0.9197827128888302| 0.362464770874404|19.179139106643607|americas/united_s...|rider-213|0.0|c9a45eda-fe53-480...| | 20200818091618| 20200818091618_1_3|35808b31-2d1e-474...| americas/united_s...|3e9b3e64-3895-46a...| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|americas/united_s...|rider-213|0.0|35808b31-2d1e-474...| | 20200818091618| 20200818091618_1_4|67e1c9d5-a3c0-4f7...| americas/united_s...|3e9b3e64-3895-46a...|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|americas/united_s...|rider-213|0.0|67e1c9d5-a3c0-4f7...| | 20200818091618| 20200818091618_1_5|8fdf91c8-b0ca-46c...| americas/united_s...|3e9b3e64-3895-46a...| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|americas/united_s...|rider-213|0.0|8fdf91c8-b0ca-46c...| | 20200818091618| 20200818091618_0_1|2efbfbf1-aa1f-40f...| americas/brazil/s...|a71d09b8-7cc8-408...| 0.4726905879569653|0.46157858450465483|driver-213| 0.754803407008858| 0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|0.0|2efbfbf1-aa1f-40f...| | 20200818091618| 20200818091618_0_2|2bbebad3-1a3c-4f1...| americas/brazil/s...|a71d09b8-7cc8-408...| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|americas/brazil/s...|rider-213|0.0|2bbebad3-1a3c-4f1...| | 20200818091618| 20200818091618_0_3|2c3d179c-899f-42f...| americas/brazil/s...|a71d09b8-7cc8-408...| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655| 43.4923811219014|americas/brazil/s...|rider-213|0.0|2c3d179c-899f-42f...| | 20200818091618| 20200818091618_2_1|3c9add87-8347-41d...| asia/india/chennai|df2d7f47-0d10-43b...| 0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368| asia/india/chennai|rider-213|0.0|3c9add87-8347-41d...| | 20200818091618| 20200818091618_2_2|8cd8ff41-791e-43a...| asia/india/chennai|df2d7f47-0d10-43b...| 0.40613510977307| 0.5644092139040959|driver-213| 0.798706304941517|0.02698359227182834|17.851135255091155| asia/india/chennai|rider-213|0.0|8cd8ff41-791e-43a...| +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ ----------- INCREMENTAL READ ------- Now we're updating a row, we expect to see the updated row only on incremental read, which we do 20/08/18 09:17:36 WARN IncrementalTimelineSyncFileSystemView: Incremental Sync of timeline is turned off or deemed unsafe. Will revert to full syncing READING FROM 20200818091705 +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+ |_hoodie_commit_time|_hoodie_commit_seqno| _hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid| +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+ | 20200818091706| 20200818091706_0_3|35808b31-2d1e-474...| americas/united_s...|3e9b3e64-3895-46a...|0.7340133901254792|0.5142184937933181|driver-284|0.7814655558162802|0.6592596683641996|49.527694252432056|americas/united_s...|rider-284|0.0|35808b31-2d1e-474...| +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+ ----------- INCREMENTAL READ ------- Re-upserting the same row twice causes it to be 'emitted' twice to the incremental reader, even though the contents of the second reading are identical from the first (metadata aside) 20/08/18 09:18:04 WARN IncrementalTimelineSyncFileSystemView: Incremental Sync of timeline is turned off or deemed unsafe. Will revert to full syncing READING FROM 20200818091736 +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+ |_hoodie_commit_time|_hoodie_commit_seqno| _hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid| +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+ | 20200818091737| 20200818091737_0_4|35808b31-2d1e-474...| americas/united_s...|3e9b3e64-3895-46a...|0.7340133901254792|0.5142184937933181|driver-284|0.7814655558162802|0.6592596683641996|49.527694252432056|americas/united_s...|rider-284|0.0|35808b31-2d1e-474...| +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+ ``` **Expected behavior** Ideally (in our use case), upserting a row whose contents is identical doesn't cause an incremental reader to read the data again. **Environment Description** * Hudi version : 0.5.3-rc2, built from source * Spark version : 2.4.4 (Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252) * Hive version : 2.3.6 * Hadoop version : 2.8.5-amzn-5 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no * EMR Version : emr-5.29.0 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
