wuchong commented on code in PR #2532:
URL: https://github.com/apache/fluss/pull/2532#discussion_r2754066507
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
private val logScanner =
table.newScan().project(projection).createLogScanner()
// Iterator for current batch of records
- private var currentRecords: java.util.Iterator[ScanRecord] = _
+ private var currentRecords: java.util.Iterator[ScanRecord] =
java.util.Collections.emptyIterator()
+
+ // The latest offset of fluss is -2
+ private var currentOffset: Long = flussPartition.startOffset.max(0L)
// initialize log scanner
initialize()
- override def next(): Boolean = {
- if (closed) {
- return false
- }
-
- // If we have records in current batch, return next one
- if (currentRecords != null && currentRecords.hasNext) {
- val scanRecord = currentRecords.next()
- currentRow = convertToSparkRow(scanRecord)
- return true
- }
-
- // Poll for more records
+ private def pollMoreRecords(): Unit = {
val scanRecords = logScanner.poll(POLL_TIMEOUT)
+ if ((scanRecords == null || scanRecords.isEmpty) && currentOffset <
flussPartition.stopOffset) {
+ throw new IllegalStateException(s"No more data from fluss server," +
+ s" but current offset $currentOffset not reach the stop offset
${flussPartition.stopOffset}")
Review Comment:
`logScanner.poll()` may return empty results when the Fluss server is
undergoing recovery, restart, or rebalance. Given that the current
`POLL_TIMEOUT` is set to a very short duration (`100ms`), this scenario is
highly likely to occur.
Currently, the source immediately throws an exception if `logScanner.poll()`
returns no records, which makes it unstable during Fluss server failover events.
A straightforward fix is to increase the `POLL_TIMEOUT` to **60 seconds**.
This means the source will wait up to 60 seconds for data during transient
server unavailability. If the Fluss server still hasn’t recovered within that
window, we can then throw an exception to alert users.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -110,11 +155,12 @@ class FlussUpsertBatch(
tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
+ startOffsetsInitializer: OffsetsInitializer,
Review Comment:
The `startOffsetsInitializer` is not used in `FlussUpsertBatch`.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala:
##########
@@ -60,6 +95,15 @@ case class FlussUpsertScan(
extends FlussScan {
override def toBatch: Batch = {
- new FlussUpsertBatch(tablePath, tableInfo, readSchema, options,
flussConfig)
+ val startOffsetsInitializer = FlussScan.startOffsetsInitializer(options)
Review Comment:
The `startOffsetsInitializer` is not used `FlussUpsertBatch`. For
`FlussUpsertBatch` (primary key tables), we currently only support `FULL`
startup mode that reads kv snapshot first, and then swith to the corresponding
log offsets. So we should check the startup mode is FULL, otherwise, throw
unsupported exception.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
private val logScanner =
table.newScan().project(projection).createLogScanner()
// Iterator for current batch of records
- private var currentRecords: java.util.Iterator[ScanRecord] = _
+ private var currentRecords: java.util.Iterator[ScanRecord] =
java.util.Collections.emptyIterator()
+
+ // The latest offset of fluss is -2
+ private var currentOffset: Long = flussPartition.startOffset.max(0L)
// initialize log scanner
initialize()
- override def next(): Boolean = {
- if (closed) {
- return false
- }
-
- // If we have records in current batch, return next one
- if (currentRecords != null && currentRecords.hasNext) {
- val scanRecord = currentRecords.next()
- currentRow = convertToSparkRow(scanRecord)
- return true
- }
-
- // Poll for more records
+ private def pollMoreRecords(): Unit = {
val scanRecords = logScanner.poll(POLL_TIMEOUT)
+ if ((scanRecords == null || scanRecords.isEmpty) && currentOffset <
flussPartition.stopOffset) {
+ throw new IllegalStateException(s"No more data from fluss server," +
+ s" but current offset $currentOffset not reach the stop offset
${flussPartition.stopOffset}")
+ }
+ currentRecords = scanRecords.records(tableBucket).iterator()
+ }
- if (scanRecords == null || scanRecords.isEmpty) {
+ override def next(): Boolean = {
+ if (closed || currentOffset >= flussPartition.stopOffset) {
return false
}
- // Get records for our bucket
- val bucketRecords = scanRecords.records(tableBucket)
- if (bucketRecords.isEmpty) {
- return false
+ if (!currentRecords.hasNext) {
+ pollMoreRecords()
}
- currentRecords = bucketRecords.iterator()
+ // If we have records in current batch, return next one
if (currentRecords.hasNext) {
val scanRecord = currentRecords.next()
currentRow = convertToSparkRow(scanRecord)
+ currentOffset += 1
Review Comment:
We can't simply `+1` and should use `currentOffset = scanRecord.logOffset()
+ 1` instead. Because, there are some record batch increases log offsets
without any records in it. Simply +1 will lead to missing some data.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -83,10 +82,13 @@ class FlussAppendPartitionReader(
}
private def initialize(): Unit = {
+ logInfo(s"Prepare read table $tablePath partition $partitionId bucket
$bucketId" +
+ s" with start offset ${flussPartition.startOffset} stop offset
${flussPartition.stopOffset}")
if (partitionId != null) {
- logScanner.subscribeFromBeginning(partitionId, bucketId)
+ logScanner.subscribe(partitionId, bucketId, flussPartition.startOffset)
} else {
- logScanner.subscribeFromBeginning(bucketId)
+ logScanner.subscribe(bucketId, flussPartition.startOffset)
}
+ pollMoreRecords()
Review Comment:
When both the start and end offsets are set to `LATEST`, the stop offset may
be less than or equal to the start offset. In such cases, attempting to poll
records will cause `pollMoreRecords()` to throw an exception.
To avoid this, we should explicitly validate that the start offset is
strictly less than the stop offset before initiating polling.
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala:
##########
@@ -58,4 +58,15 @@ object SparkConnectorOptions {
val SPARK_TABLE_OPTIONS: Seq[String] =
Seq(PRIMARY_KEY, BUCKET_KEY, BUCKET_NUMBER, COMMENT).map(_.key)
+
+ object StartUpMode extends Enumeration {
+ val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+ }
+
+ val SCAN_START_UP_MODE: ConfigOption[String] =
+ ConfigBuilder
+ .key("scan.startup.mode")
+ .stringType()
+ .defaultValue(StartUpMode.LATEST.toString)
Review Comment:
Should we use the default `FULL` mode to stay aligned with the Flink
connector?
Using `LATEST` by default may result in empty results if the user doesn’t
explicitly specify a startup mode for the query.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]