nsivabalan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r923241495


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala:
##########
@@ -50,6 +51,10 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[ComplexKeyGenerator].getName,
+    DataSourceWriteOptions.OPERATION.key -> 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+    // HoodieWriteConfig.BULK_INSERT_WRITE_STREAM_ENABLE.key -> "true",

Review Comment:
   does tests need more fixes? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   may I know why we are disabling row writer for bulk_insert?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = 
sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(),
 classOf[HoodieCommitMetadata])
+        val lastCheckpoint = 
commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)

Review Comment:
   not required in this patch. But do we need to design/impl this similar to 
how deltastreamer checkpointing is done. with Deltastreamer, its feasible to do 
1 writer w/ DS and another writer w/ Spark datasource and still Deltastreamer 
will be able to fetch the right checkpoint to resume from everytime. 
   Here I see, we are fetching only the latest commit. So this may not work w/ 
multi -writer scenarios. may be we can create a follow up ticket and work on it 
rather than expanding the scope of this patch. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = 
sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(),
 classOf[HoodieCommitMetadata])
+        val lastCheckpoint = 
commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = 
metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   and I guess, this batchId to commit metadata is not applicable for 
`STREAMING_IGNORE_FAILED_BATCH` config being set to true? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = 
sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(),
 classOf[HoodieCommitMetadata])
+        val lastCheckpoint = 
commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = 
metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   btw, instead of fixing it here, do you think we can do it in L157 (old code) 
or L212 as per this patch. Or since this is the only place where commit 
actually got succeeded in hudi and we will have commit metadata to update the 
batchId, let me know. it makes sense. 
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+      updatedOptions = updatedOptions.updated(ENABLE_ROW_WRITER.key, "false")
+    }
+
+    val queryId = 
sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    assert(queryId != null)
+    log.warn(s"Query id: $queryId")
+
+    if (metaClient != null) {
+      val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      if (lastCommit.isPresent) {
+        val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastCommit.get()).get(),
 classOf[HoodieCommitMetadata])
+        val lastCheckpoint = 
commitMetadata.getMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
+        if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+          latestBatchId = lastCheckpoint.toLong
+        }
+      }
+    }
+
+    if (latestBatchId >= batchId) {
+      log.warn(s"Skipping already completed batch $batchId in query $queryId")
+      return
+    }
 
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
           sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
-      ) match {
+      )
+      match {
         case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
-          log.info(s"Micro batch id=$batchId succeeded"
-            + (commitOps.isPresent match {
-            case true => s" for commit=${commitOps.get()}"
-            case _ => s" with no new commits"
+          log.warn(s"Micro batch id=$batchId succeeded"
+            + (if (commitOps.isPresent) {
+            s" for commit=${commitOps.get()}"
+          } else {
+            s" with no new commits"
           }))
+          log.warn(s"Current value of latestBatchId: $latestBatchId")
+          log.warn(s"Setting latestBatchId to batchId $batchId")
+          latestBatchId = batchId
           writeClient = Some(client)
           hoodieTableConfig = Some(tableConfig)
+          metaClient = HoodieTableMetaClient.builder()
+            .setConf(sqlContext.sparkContext.hadoopConfiguration)
+            .setBasePath(client.getConfig.getBasePath)
+            .build()
+          // let's update batchId as checkpoint for this commit
+          if (commitOps.isPresent) {
+            val instant = 
metaClient.getActiveTimeline.getCompletedInstantForTimestamp(commitOps.get())

Review Comment:
   I am not very fond of the way we solve this, but guess there is not much we 
can do. will keep thinking if there is any better way to do this. But for now, 
will proceed on w/ the review.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -46,14 +48,23 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     with Serializable {
   @volatile private var latestBatchId = -1L
 
+  /*@transient private val hadoopConf = 
sqlContext.sparkSession.sessionState.newHadoopConf()

Review Comment:
   can we fix this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to