wuchong commented on code in PR #2532:
URL: https://github.com/apache/fluss/pull/2532#discussion_r2754066507


##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
   private val logScanner = 
table.newScan().project(projection).createLogScanner()
 
   // Iterator for current batch of records
-  private var currentRecords: java.util.Iterator[ScanRecord] = _
+  private var currentRecords: java.util.Iterator[ScanRecord] = 
java.util.Collections.emptyIterator()
+
+  // The latest offset of fluss is -2
+  private var currentOffset: Long = flussPartition.startOffset.max(0L)
 
   // initialize log scanner
   initialize()
 
-  override def next(): Boolean = {
-    if (closed) {
-      return false
-    }
-
-    // If we have records in current batch, return next one
-    if (currentRecords != null && currentRecords.hasNext) {
-      val scanRecord = currentRecords.next()
-      currentRow = convertToSparkRow(scanRecord)
-      return true
-    }
-
-    // Poll for more records
+  private def pollMoreRecords(): Unit = {
     val scanRecords = logScanner.poll(POLL_TIMEOUT)
+    if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < 
flussPartition.stopOffset) {
+      throw new IllegalStateException(s"No more data from fluss server," +
+        s" but current offset $currentOffset not reach the stop offset 
${flussPartition.stopOffset}")

Review Comment:
   `logScanner.poll()` may return empty results when the Fluss server is 
undergoing recovery, restart, or rebalance. Given that the current 
`POLL_TIMEOUT` is set to a very short duration (`100ms`), this scenario is 
highly likely to occur.  
   
   Currently, the source immediately throws an exception if `logScanner.poll()` 
returns no records, which makes it unstable during Fluss server failover events.
   
   A straightforward fix is to increase the `POLL_TIMEOUT` to **60 seconds**. 
This means the source will wait up to 60 seconds for data during transient 
server unavailability. If the Fluss server still hasn’t recovered within that 
window, we can then throw an exception to alert users.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -110,11 +155,12 @@ class FlussUpsertBatch(
     tablePath: TablePath,
     tableInfo: TableInfo,
     readSchema: StructType,
+    startOffsetsInitializer: OffsetsInitializer,

Review Comment:
   The `startOffsetsInitializer` is not used in `FlussUpsertBatch`. 



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala:
##########
@@ -60,6 +95,15 @@ case class FlussUpsertScan(
   extends FlussScan {
 
   override def toBatch: Batch = {
-    new FlussUpsertBatch(tablePath, tableInfo, readSchema, options, 
flussConfig)
+    val startOffsetsInitializer = FlussScan.startOffsetsInitializer(options)

Review Comment:
   The `startOffsetsInitializer` is not used `FlussUpsertBatch`. For 
`FlussUpsertBatch` (primary key tables), we currently only support `FULL` 
startup mode that reads kv snapshot first, and then swith to the corresponding 
log offsets. So we should check the startup mode is FULL, otherwise, throw 
unsupported exception. 



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
   private val logScanner = 
table.newScan().project(projection).createLogScanner()
 
   // Iterator for current batch of records
-  private var currentRecords: java.util.Iterator[ScanRecord] = _
+  private var currentRecords: java.util.Iterator[ScanRecord] = 
java.util.Collections.emptyIterator()
+
+  // The latest offset of fluss is -2
+  private var currentOffset: Long = flussPartition.startOffset.max(0L)
 
   // initialize log scanner
   initialize()
 
-  override def next(): Boolean = {
-    if (closed) {
-      return false
-    }
-
-    // If we have records in current batch, return next one
-    if (currentRecords != null && currentRecords.hasNext) {
-      val scanRecord = currentRecords.next()
-      currentRow = convertToSparkRow(scanRecord)
-      return true
-    }
-
-    // Poll for more records
+  private def pollMoreRecords(): Unit = {
     val scanRecords = logScanner.poll(POLL_TIMEOUT)
+    if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < 
flussPartition.stopOffset) {
+      throw new IllegalStateException(s"No more data from fluss server," +
+        s" but current offset $currentOffset not reach the stop offset 
${flussPartition.stopOffset}")
+    }
+    currentRecords = scanRecords.records(tableBucket).iterator()
+  }
 
-    if (scanRecords == null || scanRecords.isEmpty) {
+  override def next(): Boolean = {
+    if (closed || currentOffset >= flussPartition.stopOffset) {
       return false
     }
 
-    // Get records for our bucket
-    val bucketRecords = scanRecords.records(tableBucket)
-    if (bucketRecords.isEmpty) {
-      return false
+    if (!currentRecords.hasNext) {
+      pollMoreRecords()
     }
 
-    currentRecords = bucketRecords.iterator()
+    // If we have records in current batch, return next one
     if (currentRecords.hasNext) {
       val scanRecord = currentRecords.next()
       currentRow = convertToSparkRow(scanRecord)
+      currentOffset += 1

Review Comment:
   We can't simply `+1` and should use `currentOffset = scanRecord.logOffset() 
+ 1` instead. Because, there are some record batch increases log offsets 
without any records in it. Simply +1 will lead to missing some data. 



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -83,10 +82,13 @@ class FlussAppendPartitionReader(
   }
 
   private def initialize(): Unit = {
+    logInfo(s"Prepare read table $tablePath partition $partitionId bucket 
$bucketId" +
+      s" with start offset ${flussPartition.startOffset} stop offset 
${flussPartition.stopOffset}")
     if (partitionId != null) {
-      logScanner.subscribeFromBeginning(partitionId, bucketId)
+      logScanner.subscribe(partitionId, bucketId, flussPartition.startOffset)
     } else {
-      logScanner.subscribeFromBeginning(bucketId)
+      logScanner.subscribe(bucketId, flussPartition.startOffset)
     }
+    pollMoreRecords()

Review Comment:
   When both the start and end offsets are set to `LATEST`, the stop offset may 
be less than or equal to the start offset. In such cases, attempting to poll 
records will cause `pollMoreRecords()` to throw an exception.  
   
   To avoid this, we should explicitly validate that the start offset is 
strictly less than the stop offset before initiating polling.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala:
##########
@@ -58,4 +58,15 @@ object SparkConnectorOptions {
 
   val SPARK_TABLE_OPTIONS: Seq[String] =
     Seq(PRIMARY_KEY, BUCKET_KEY, BUCKET_NUMBER, COMMENT).map(_.key)
+
+  object StartUpMode extends Enumeration {
+    val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+  }
+
+  val SCAN_START_UP_MODE: ConfigOption[String] =
+    ConfigBuilder
+      .key("scan.startup.mode")
+      .stringType()
+      .defaultValue(StartUpMode.LATEST.toString)

Review Comment:
   Should we use the default `FULL` mode to stay aligned with the Flink 
connector?  
   Using `LATEST` by default may result in empty results if the user doesn’t 
explicitly specify a startup mode for the query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to