vinothchandar commented on code in PR #6098:
URL: https://github.com/apache/hudi/pull/6098#discussion_r928369447


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -84,20 +96,62 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.name())
     // we need auto adjustment enabled for streaming sink since async table 
services are feasible within the same JVM.
     updatedOptions = 
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
+    // disable row writer bulk insert of write stream
+    if (options.getOrDefault(OPERATION.key, 
UPSERT_OPERATION_OPT_VAL).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {

Review Comment:
   Row writing is a top priority no? Love to understand this more.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -247,4 +285,18 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       writeClient = Option.empty
     }
   }
+
+  private def canSkipBatch(batchId: Long): Boolean = {
+    // get the latest checkpoint from the commit metadata to check if the 
microbatch has already been prcessed or not
+    val lastCommit = 
metaClient.get.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    if (lastCommit.isPresent) {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(
+        
metaClient.get.getActiveTimeline.getInstantDetails(lastCommit.get()).get(), 
classOf[HoodieCommitMetadata])
+      val lastCheckpoint = commitMetadata.getMetadata(SinkCheckpointKey)
+      if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
+        latestBatchId = lastCheckpoint.toLong
+      }
+    }
+    latestBatchId >= batchId

Review Comment:
   +1 Might be good to make the data model support multiple values from day 1 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -48,12 +50,24 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
-    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
-  private val retryIntervalMs = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
-    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
-  private val ignoreFailedBatch = 
options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
-    
DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
+  private val tablePath = new Path(options.getOrElse("path", "Missing 'path' 
option"))
+  private var metaClient: Option[HoodieTableMetaClient] = {
+    try {
+      
Some(HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration).setBasePath(tablePath.toString).build())
+    } catch {
+      case _: TableNotFoundException =>
+        log.warn("Ignore TableNotFoundException as it is first microbatch.")
+        Option.empty
+    }
+  }
+  private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key,
+    STREAMING_RETRY_CNT.defaultValue).toInt
+  private val retryIntervalMs = 
options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key,
+    STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+  private val ignoreFailedBatch = 
options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,

Review Comment:
   TBH I think we should make it fail by default and not ignore. Original 
author from Apple wanted itthat way for them. But probably does not make sense 
at this point anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to