hughfdjackson opened a new issue #1979:
URL: https://github.com/apache/hudi/issues/1979


   **Describe the problem you faced**
   
   My team are interested in writing to Hudi tables using a repeated batch 
process that often upserts data that's identical to what's already there.  For 
instance, we may be: 
   
   - recalculating # of times a particular set of event has occurred
   - re-running a query over the last week of data, to include potentially late 
arriving data. 
   
   We also have some consumers that want to consume these tables incrementally 
(to ingest the latest results into local databases, or monitor the changes).  
Ideally, these consumers would only see the 1% of records that have changed, 
rather than all records involved in the upsert. 
   
   However, in our testing, it seems like the incremental query returns _all_ 
records that were involved in the upsert, even if they were overwriting 
identical data.  
   
   (As far as I can tell, this happens here: 
https://github.com/apache/hudi/blob/release-0.5.3/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L238-L244,
 no matter which `PAYLOAD_CLASS_OPT_KEY` class is used).
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. clone hudi git repo, checkout `release-0.5.3-rc2` and run `mvn clean 
package -DskipTests -DskipITs`
   2. Copy 
`packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.5.3-rc2.jar` to 
EMR master node
   3. Run the following spark shell on master, with the command: `spark-shell 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.hive.convertMetastoreParquet=false" --jars 
hudi-spark-bundle_2.11-0.5.3-rc2.jar,/usr/lib/spark/external/lib/spark-avro.jar 
-i spark-shell-script`
   
   where `spark-shell-script` contents is:
   ```scala
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.spark.sql.SaveMode
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.DataFrame
   import org.apache.hudi.common.table.HoodieTableMetaClient
   import org.apache.hudi.table.HoodieTable
   import org.apache.hudi.config.HoodieWriteConfig
     
   // Helper functions
   val basePath = "s3://{s3BucketNameAndPrefixPath}"
   val tableName = "hudi_incremental_read_test"
   def write(df: DataFrame, saveMode: SaveMode = Append) = 
df.write.format("hudi")
       .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
       .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
       .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
       .option("hoodie.consistency.check.enabled", "true")
       .option(TABLE_NAME, tableName)
       .mode(saveMode)
       .save(basePath)
   def incrementalRead(beginInstant: String) = { 
       println(s"READING FROM $beginInstant")   
       spark.read
        .format("hudi")
        .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
        .option(BEGIN_INSTANTTIME_OPT_KEY, beginInstant)
        .load(basePath)
   } 
   def latestCommitInstant() = { 
     val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
     val hoodieTable = HoodieTable.getHoodieTable(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(), spark.sparkContext)
     
     
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants().lastInstant.get.getTimestamp
   }
   
   def justBefore(commitTime: String) = (commitTime.toLong - 1).toString
   val dataGen = new DataGenerator
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   
   write(df, saveMode=Overwrite)
   
   println("""
   ----------- INCREMENTAL READ -------
   """)
   println("The whole table is new, so I'm expecting all 10 rows to be returned 
on incremental read")
   incrementalRead(justBefore(latestCommitInstant)).show()
   
   // generate an update for a single row
   val updates = convertToStringList(dataGen.generateUpdates(1))
   val updatesDF = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   println("""
   ----------- INCREMENTAL READ -------
   """)
   println("Now we're updating a row, we expect to see the updated row only on 
incremental read, which we do")
   write(updatesDF)
   incrementalRead(justBefore(latestCommitInstant)).show()
   
   println("""
   ----------- INCREMENTAL READ -------
   """)
   println("Re-upserting the same row twice causes it to be 'emitted' twice to 
the incremental reader, even though the contents of the second reading are 
identical from the first (metadata aside)")
   write(updatesDF)
   incrementalRead(justBefore(latestCommitInstant)).show()
   ```
   
   That results in: 
   
   ```
   ----------- INCREMENTAL READ -------
   The whole table is new, so I'm expecting all 10 rows to be returned on 
incremental read
   READING FROM 20200818091617
   
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
   |_hoodie_commit_time|_hoodie_commit_seqno|  
_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|          
begin_lat|          begin_lon|    driver|            end_lat|            
end_lon|              fare|       partitionpath|    rider| ts|                
uuid|
   
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
   |     20200818091618|  20200818091618_1_1|ecde6618-0cbc-4b6...|  
americas/united_s...|3e9b3e64-3895-46a...|0.21624150367601136|0.14285051259466197|driver-213|
 0.5890949624813784| 0.0966823831927115| 
93.56018115236618|americas/united_s...|rider-213|0.0|ecde6618-0cbc-4b6...|
   |     20200818091618|  20200818091618_1_2|c9a45eda-fe53-480...|  
americas/united_s...|3e9b3e64-3895-46a...| 0.8742041526408587| 
0.7528268153249502|driver-213| 0.9197827128888302|  
0.362464770874404|19.179139106643607|americas/united_s...|rider-213|0.0|c9a45eda-fe53-480...|
   |     20200818091618|  20200818091618_1_3|35808b31-2d1e-474...|  
americas/united_s...|3e9b3e64-3895-46a...| 0.5731835407930634| 
0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 
64.27696295884016|americas/united_s...|rider-213|0.0|35808b31-2d1e-474...|
   |     20200818091618|  20200818091618_1_4|67e1c9d5-a3c0-4f7...|  
americas/united_s...|3e9b3e64-3895-46a...|0.11488393157088261| 
0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 
27.79478688582596|americas/united_s...|rider-213|0.0|67e1c9d5-a3c0-4f7...|
   |     20200818091618|  20200818091618_1_5|8fdf91c8-b0ca-46c...|  
americas/united_s...|3e9b3e64-3895-46a...| 0.1856488085068272| 
0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 
33.92216483948643|americas/united_s...|rider-213|0.0|8fdf91c8-b0ca-46c...|
   |     20200818091618|  20200818091618_0_1|2efbfbf1-aa1f-40f...|  
americas/brazil/s...|a71d09b8-7cc8-408...| 
0.4726905879569653|0.46157858450465483|driver-213|  0.754803407008858| 
0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|0.0|2efbfbf1-aa1f-40f...|
   |     20200818091618|  20200818091618_0_2|2bbebad3-1a3c-4f1...|  
americas/brazil/s...|a71d09b8-7cc8-408...| 
0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 
0.6346040067610669| 
66.62084366450246|americas/brazil/s...|rider-213|0.0|2bbebad3-1a3c-4f1...|
   |     20200818091618|  20200818091618_0_3|2c3d179c-899f-42f...|  
americas/brazil/s...|a71d09b8-7cc8-408...| 0.6100070562136587| 
0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  
43.4923811219014|americas/brazil/s...|rider-213|0.0|2c3d179c-899f-42f...|
   |     20200818091618|  20200818091618_2_1|3c9add87-8347-41d...|    
asia/india/chennai|df2d7f47-0d10-43b...|  0.651058505660742| 
0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 
41.06290929046368|  asia/india/chennai|rider-213|0.0|3c9add87-8347-41d...|
   |     20200818091618|  20200818091618_2_2|8cd8ff41-791e-43a...|    
asia/india/chennai|df2d7f47-0d10-43b...|   0.40613510977307| 
0.5644092139040959|driver-213|  
0.798706304941517|0.02698359227182834|17.851135255091155|  
asia/india/chennai|rider-213|0.0|8cd8ff41-791e-43a...|
   
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
   ----------- INCREMENTAL READ -------
   Now we're updating a row, we expect to see the updated row only on 
incremental read, which we do
   20/08/18 09:17:36 WARN IncrementalTimelineSyncFileSystemView: Incremental 
Sync of timeline is turned off or deemed unsafe. Will revert to full syncing
   READING FROM 20200818091705
   
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+
   |_hoodie_commit_time|_hoodie_commit_seqno|  
_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|         
begin_lat|         begin_lon|    driver|           end_lat|           end_lon|  
            fare|       partitionpath|    rider| ts|                uuid|
   
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+
   |     20200818091706|  20200818091706_0_3|35808b31-2d1e-474...|  
americas/united_s...|3e9b3e64-3895-46a...|0.7340133901254792|0.5142184937933181|driver-284|0.7814655558162802|0.6592596683641996|49.527694252432056|americas/united_s...|rider-284|0.0|35808b31-2d1e-474...|
   
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+
   ----------- INCREMENTAL READ -------
   Re-upserting the same row twice causes it to be 'emitted' twice to the 
incremental reader, even though the contents of the second reading are 
identical from the first (metadata aside)
   20/08/18 09:18:04 WARN IncrementalTimelineSyncFileSystemView: Incremental 
Sync of timeline is turned off or deemed unsafe. Will revert to full syncing
   READING FROM 20200818091736
   
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+
   |_hoodie_commit_time|_hoodie_commit_seqno|  
_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|         
begin_lat|         begin_lon|    driver|           end_lat|           end_lon|  
            fare|       partitionpath|    rider| ts|                uuid|
   
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+
   |     20200818091737|  20200818091737_0_4|35808b31-2d1e-474...|  
americas/united_s...|3e9b3e64-3895-46a...|0.7340133901254792|0.5142184937933181|driver-284|0.7814655558162802|0.6592596683641996|49.527694252432056|americas/united_s...|rider-284|0.0|35808b31-2d1e-474...|
   
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+------------------+----------+------------------+------------------+------------------+--------------------+---------+---+--------------------+
   ```
   
   **Expected behavior**
   
   Ideally (in our use case), upserting a row whose contents is identical 
doesn't cause an incremental reader to read the data again. 
   
   **Environment Description**
   
   * Hudi version : 0.5.3-rc2, built from source
   
   * Spark version : 2.4.4 (Using Scala version 2.11.12, OpenJDK 64-Bit Server 
VM, 1.8.0_252)
   
   * Hive version : 2.3.6
   
   * Hadoop version : 2.8.5-amzn-5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   * EMR Version : emr-5.29.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to