Yohahaha commented on code in PR #2532:
URL: https://github.com/apache/fluss/pull/2532#discussion_r2757468359
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala:
##########
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
private val logScanner =
table.newScan().project(projection).createLogScanner()
// Iterator for current batch of records
- private var currentRecords: java.util.Iterator[ScanRecord] = _
+ private var currentRecords: java.util.Iterator[ScanRecord] =
java.util.Collections.emptyIterator()
+
+ // The latest offset of fluss is -2
+ private var currentOffset: Long = flussPartition.startOffset.max(0L)
// initialize log scanner
initialize()
- override def next(): Boolean = {
- if (closed) {
- return false
- }
-
- // If we have records in current batch, return next one
- if (currentRecords != null && currentRecords.hasNext) {
- val scanRecord = currentRecords.next()
- currentRow = convertToSparkRow(scanRecord)
- return true
- }
-
- // Poll for more records
+ private def pollMoreRecords(): Unit = {
val scanRecords = logScanner.poll(POLL_TIMEOUT)
+ if ((scanRecords == null || scanRecords.isEmpty) && currentOffset <
flussPartition.stopOffset) {
+ throw new IllegalStateException(s"No more data from fluss server," +
+ s" but current offset $currentOffset not reach the stop offset
${flussPartition.stopOffset}")
Review Comment:
poll timeout could be explicit configured now, and default values is 10s
align with flink.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]