ssdong opened a new issue #2818:
URL: https://github.com/apache/hudi/issues/2818


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   
   1. incremental query throws `java.util.NoSuchElementException: No value 
present in Option` when the provided `beginInstantTime` and `endInstantTime` 
lands in between two timestamps on the active timeline for MOR table while COW 
table returns empty.
   2. Potential incremental change loss if the commit files have been archived?
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   ## For problem 1 (`NoSuchElementException`)
   1. Build latest master on my local and trigger it: 
   ```./bin/spark-shell \
    --packages org.apache.spark:spark-avro_2.12:3.0.1 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 
"spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=127.0.0.1:35005"
 \
    --conf 
"spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=127.0.0.1:25005"
 \
    --jars 
~/temp/repository/org/apache/hudi/hudi-spark3-bundle_2.12/0.9.0-SNAPSHOT/hudi-spark3-bundle_2.12-0.9.0-SNAPSHOT.jar
   ```
   2. Insert the first batch of records through: 
   ```import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import java.util.UUID
   
   val tableName = "hudi_date_cow"
   val basePath = "/Users/susu.dong/Dev/incremental-query-mor-test"
   val writeConfigs = Map(
        "hoodie.cleaner.incremental.mode" -> "true",
        "hoodie.insert.shuffle.parallelism" -> "2",
        "hoodie.datasource.write.recordkey.field" -> "id",
        "hoodie.datasource.write.precombine.field" -> "id",
        "hoodie.upsert.shuffle.parallelism" -> "2",
        "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
        "hoodie.clean.automatic" -> "false",
        "hoodie.table.name" -> tableName,
        "hoodie.keep.max.commits" -> "3",
        "hoodie.cleaner.commits.retained" -> "1",
        "hoodie.keep.min.commits" -> "2",
   )
   var seq = Seq(
       (0, "value", UUID.randomUUID.toString)
   )
   for(i <- 1 to 500) {
       seq :+= (i, "value", UUID.randomUUID.toString)
   }
   val df = seq.toDF("id", "string_column", "uuid")
   df.write.format("hudi").options(writeConfigs).mode(Overwrite).save(basePath)
   ```
   3. Update a few individual records through:
   ```
   val newSeq1 = Seq(
       (199, "test1", "c041b696-cd20-43aa-b835-eaa9cae603a3")
   )
   val newDf1 = newSeq1.toDF("id", "string_column", "uuid")
   newDf1.write.format("hudi").options(writeConfigs).mode(Append).save(basePath)
   
   
   val newSeq2 = Seq(
       (200, "test2", "fbf006a2-9cd6-11eb-a8b3-0242ac130003")
   )
   val newDf2 = newSeq2.toDF("id", "string_column", "uuid")
   newDf2.write.format("hudi").options(writeConfigs).mode(Append).save(basePath)
   
   
   val newSeq3 = Seq(
       (200, "test3", "d1ad944a-9cd6-11eb-a8b3-0242ac130003")
   )
   val newDf3 = newSeq3.toDF("id", "string_column", "uuid")
   newDf3.write.format("hudi").options(writeConfigs).mode(Append).save(basePath)
   ```
   4. Here are the current active timeline instants:
   <img width="1115" alt="Screen Shot 2021-04-14 at 14 24 07" 
src="https://user-images.githubusercontent.com/3754011/114658632-22ed5380-9d2d-11eb-8889-2308fb6ca2fe.png";>
   
   5. Now query the table with incrementally by carefully crafting the 
`beingInstantTime` and `endInstantTime`, e.g. `20210414131826` and 
`20210414131908`. Notice that the former represents 
`20210414131826.deltacommit` while the latter represents `-1` of 
`20210414131909.deltacommit`. An `NoSuchElementException` is being thrown:
   <img width="767" alt="Screen Shot 2021-04-14 at 14 28 49" 
src="https://user-images.githubusercontent.com/3754011/114659008-c2aae180-9d2d-11eb-80e9-af4bb4aadd7c.png";>
   
   **_However, this isn't an issue on a COW where it returns an empty table 
with something like the following:_**
   <img width="895" alt="Screen Shot 2021-04-14 at 14 30 29" 
src="https://user-images.githubusercontent.com/3754011/114659137-fb4abb00-9d2d-11eb-87bb-a0c9ae84d3fd.png";>
   
   ### We should expect the same behaviour for both COW and MOR ? 
   
   ## For problem 2 (Potential change data loss?)
   1. Continue the exact same setting as problem 1 and this time, update 
another record:
   ```
   val newSeq4 = Seq(
       (202, "test4", "0e60fbca-9cd7-11eb-a8b3-0242ac130003")
   )
   val newDf4 = newSeq4.toDF("id", "string_column", "uuid")
   newDf4.write.format("hudi").options(writeConfigs).mode(Append).save(basePath)
   ```
   2. Now the previous 2 active instants will be archived due to 
`keep.max.commits` has been set to `3`.
   <img width="1109" alt="Screen Shot 2021-04-14 at 14 37 15" 
src="https://user-images.githubusercontent.com/3754011/114659827-3d283100-9d2f-11eb-8db6-e50fe651c793.png";>
   
   3. Now if we do an incremental query from the very first commit timestamp 
which is `20210414131723`(had been archived) and have it minus `1`:
   ```
   val beginTime = "20210414131722"
   
   spark.read.format("hudi")
     .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
     .option(BEGIN_INSTANTTIME_OPT_KEY, beginTime)
     .load(basePath).show()
   ```
   It gives:
   <img width="1027" alt="Screen Shot 2021-04-14 at 14 42 14" 
src="https://user-images.githubusercontent.com/3754011/114660061-a1e38b80-9d2f-11eb-849c-c2ecd55d96d0.png";>
   
   4. The update made to id `199` with `(199, "test1", 
"c041b696-cd20-43aa-b835-eaa9cae603a3")` and to id `200` with `(200, "test3", 
"d1ad944a-9cd6-11eb-a8b3-0242ac130003")` does _not_ show up in the query, 
meaning that if we continue from here and increase our `beginTime`, the 
downstream will never receive the updates to `199` and `200`(with the second 
change). Is this normal? 
   
   
   **Environment Description**
   
   * Hudi version : latest master
   
   * Spark version : spark-3.0.1
   
   * Storage (HDFS/S3/GCS..) : local
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   Stacktrace for the first problem:
   ```
   java.util.NoSuchElementException: No value present in Option
     at org.apache.hudi.common.util.Option.get(Option.java:88)
     at 
org.apache.hudi.MergeOnReadIncrementalRelation.buildFileIndex(MergeOnReadIncrementalRelation.scala:173)
     at 
org.apache.hudi.MergeOnReadIncrementalRelation.<init>(MergeOnReadIncrementalRelation.scala:79)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:123)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:65)
     at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
     at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
     at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
     ... 91 elided
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to