xushiyan commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928170690


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -78,26 +92,49 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       log.error("Async clustering service shutdown unexpectedly")
       throw new IllegalStateException("Async clustering service shutdown 
unexpectedly")
     }
+
+    if (metaClient.isDefined && canSkipBatch(batchId)) {
+      val queryId = 
sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      assert(queryId != null)

Review Comment:
   would prefer validation utils to throw with helpful message



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   instead of using UPSERT_OPERATION_OPT_VAL directly , shouldn't we import a 
default value constant?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    
DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' 
option"))

Review Comment:
   not sure why creating a new Path then toString in next line.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    
DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' 
option"))

Review Comment:
   also the default value seems off? do you intend to throw?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the 
microbatch has already been prcessed or not
+    val lastCommit = 
metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        
metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), 
classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {

Review Comment:
   /nit StringUtils.nonEmpty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to