rangareddy commented on issue #14363:
URL: https://github.com/apache/hudi/issues/14363#issuecomment-3587868812

   
   **Reproducible Scala code:**
   
   ```scala
   import org.apache.spark.sql._
   import org.apache.spark.sql.types._
   import scala.collection.JavaConverters._
   
   val tableName = "cdc_orders_test"
   val basePath = "/tmp/hudi"
   val tablePath = s"$basePath/$tableName"
   
   val schema: StructType = StructType(
        Seq(
                StructField("order_id", IntegerType, nullable = false),
                StructField("order_customer_id", IntegerType, nullable = false),
                StructField("order_status", StringType, nullable = false),
                StructField("order_date", IntegerType, nullable = false),
                StructField("policy_types", ArrayType(StringType), nullable = 
false)
        )
   )
   
   val initialData: Seq[Row] = Seq(
       Row(1, 101, "PENDING", 19723, Seq("AUTO", "HOME")),
       Row(2, 102, "PENDING", 19724, Seq("LIFE", "HEALTH")),
       Row(3, 103, "SHIPPED", 19725, Seq("AUTO")),
       Row(4, 104, "CANCELLED", 19726, Seq("TRAVEL", "LIFE")),
       Row(5, 105, "PENDING", 19727, Seq("HOME")),
       Row(6, 106, "DELIVERED", 19728, Seq("HEALTH", "AUTO")),
       Row(7, 107, "PENDING", 19729, Seq("TRAVEL")),
       Row(8, 108, "SHIPPED", 19730, Seq("AUTO", "LIFE")),
       Row(9, 109, "CANCELLED", 19731, Seq("HEALTH")),
       Row(10, 110, "DELIVERED", 19732, Seq("HOME", "TRAVEL"))
   )
   
   val dfInit: Dataset[Row] = 
spark.createDataFrame(spark.sparkContext.parallelize(initialData), schema)
   dfInit.show()
   
   val upsertHudiOptions: Map[String, String] = Map(
        "hoodie.table.name" -> tableName,
        "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
        "hoodie.datasource.write.operation" -> "upsert",
        "hoodie.datasource.write.recordkey.field" -> "order_id",
        "hoodie.datasource.write.precombine.field" -> "order_date",
        "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
        "hoodie.table.cdc.enabled" -> "true",
        "hoodie.table.cdc.supplemental.logging.mode" -> "DATA_BEFORE_AFTER",
        "hoodie.compact.inline" -> "true",
        "hoodie.compact.inline.max.delta.commits" -> "1"
   )
   
   val emptyData: Seq[Row] = Seq(
       Row(0, 101, "DUMMY", 19723, Seq("A", "B"))
   )
   
   val emptyDf: Dataset[Row] = 
spark.createDataFrame(spark.sparkContext.parallelize(emptyData), schema)
   
   emptyDf.write
       .format("hudi")
       .options(upsertHudiOptions)
       .mode(SaveMode.Overwrite)
       .save(tablePath)
   
   spark.read.format("hudi").load(tablePath).show()
   
   dfInit.write
       .format("hudi")
       .options(upsertHudiOptions)
       .mode(SaveMode.Append)
       .save(tablePath)
   
   // Get the first successful commit time (second commit after the dummy write)
   val commitsDf: Dataset[Row] = 
spark.read.format("hudi").load(tablePath).select("_hoodie_commit_time").distinct().orderBy("_hoodie_commit_time")
   
   val commitList: List[String] = commitsDf.collect().map(_.getString(0)).toList
   
   // The first "real" insert is the second commit in the list (index 1) after 
the dummy write (index 0)
   val beginInstant: String = if (commitList.size > 1) commitList(1) else 
commitList.head
   println(s"Begin Instant Time for Incremental Query (Initial Insert Commit): 
$beginInstant")
   
   // 
------------------------------------------------------------------------------
   // 3. Perform subsequent inserts
   // 
------------------------------------------------------------------------------
   
   val updateData1: Seq[Row] = Seq(Row(11, 111, "PENDING", 19733, Seq("LIFE", 
"TRAVEL")))
   val dfUpdate1: Dataset[Row] = 
spark.createDataFrame(spark.sparkContext.parallelize(updateData1), schema)
   
dfUpdate1.write.format("hudi").options(upsertHudiOptions).mode(SaveMode.Append).save(tablePath)
   
   val updateData2: Seq[Row] = Seq(Row(12, 111, "COMPLETED", 19739, 
Seq("MOTOR", "TRAVEL")))
   val dfUpdate2: Dataset[Row] = 
spark.createDataFrame(spark.sparkContext.parallelize(updateData2), schema)
   
dfUpdate2.write.format("hudi").options(upsertHudiOptions).mode(SaveMode.Append).save(tablePath)
   
   val updateData3: Seq[Row] = Seq(Row(13, 101, "COMPLETED", 19741, 
Seq("MOTOR", "CAR")))
   val dfUpdate3: Dataset[Row] = 
spark.createDataFrame(spark.sparkContext.parallelize(updateData3), schema)
   
dfUpdate3.write.format("hudi").options(upsertHudiOptions).mode(SaveMode.Append).save(tablePath)
   
   val updateData4: Seq[Row] = Seq(Row(14, 102, "COMPLETED", 19742, 
Seq("TRAVEL", "CAR")))
   val dfUpdate4: Dataset[Row] = 
spark.createDataFrame(spark.sparkContext.parallelize(updateData4), schema)
   
dfUpdate4.write.format("hudi").options(upsertHudiOptions).mode(SaveMode.Append).save(tablePath)
   
   spark.read.format("hudi").load(tablePath).show()
   
   // 
------------------------------------------------------------------------------
   // 4. Incremental CDC read
   // 
------------------------------------------------------------------------------
   println("\n=== Incremental CDC Query (Since Initial Insert) ===")
   
   val cdcDfSinceStart: Dataset[Row] = 
spark.read.format("hudi").option("hoodie.datasource.query.type", 
"incremental").option("hoodie.datasource.query.incremental.format", 
"cdc").option("hoodie.datasource.read.begin.instanttime", 
beginInstant).load(tablePath)
   
   cdcDfSinceStart.show(false)
   cdcDfSinceStart.printSchema()
   
   // 
------------------------------------------------------------------------------
   // 5. Incremental CDC read (all changes, ROW format)
   // 
------------------------------------------------------------------------------
   println("Printing all changes since start (begin_instant '0') in ROW 
format...")
   println("\n=== Incremental CDC Query (Since Time '0', ROW Format) ===")
   
   val cdcDfAllChanges: Dataset[Row] = 
spark.read.format("hudi").option("hoodie.datasource.query.type", 
"incremental").option("hoodie.datasource.query.incremental.format", 
"cdc").option("hoodie.datasource.read.begin.instanttime", 
"0").option("hoodie.datasource.query.incremental.cdc.result.format", 
"ROW").load(tablePath)
   
   cdcDfAllChanges.show(false)
   cdcDfAllChanges.printSchema()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to