This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8026f7d22248 [SPARK-55699][SS] Inconsistent reading of LowLatencyClock
when used together with ManualClock
8026f7d22248 is described below
commit 8026f7d222489a412bcb781f49d414302b593a9b
Author: Yuchen Liu <[email protected]>
AuthorDate: Fri Feb 27 13:21:03 2026 +0900
[SPARK-55699][SS] Inconsistent reading of LowLatencyClock when used
together with ManualClock
### What changes were proposed in this pull request?
This PR updates the signature of `nextWithTimeout` in Streaming Real-Time
Mode sources. We add a new parameter `startTime`, which is used to pass the
start time used by `LowLatencyReaderWrap` to the each source. This ensures that
the starting time used by the streaming engine and the source connector are the
same.
### Why are the changes needed?
There was an issue that RTM tests that use manual clock might stuck because
manual clock advancement may happens right in between `LowLatencyReaderWrap`
getting the reference time, and `nextWithTimeout` in each source getting the
start time. `nextWithTimeout` may uses the already advanced time to time its
wait. When this happens, since the manual clock may only be advanced once by
the test, `nextWithTimeout` may never return, causing test timeout. This only
affects LowLatencyMemoryStr [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54497 from eason-yuchen-liu/fixLowLatencyClockInconsistency.
Authored-by: Yuchen Liu
<[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala | 3 ++-
.../spark/sql/connector/read/streaming/SupportsRealTimeRead.java | 5 ++++-
.../sql/execution/datasources/v2/RealTimeStreamScanExec.scala | 2 +-
.../sql/execution/streaming/sources/LowLatencyMemoryStream.scala | 8 ++++++--
4 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 9fcdf1a7d9bf..77ebcb04f2f7 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -104,7 +104,8 @@ private case class KafkaBatchPartitionReader(
}
}
- override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
+ override def nextWithTimeout(
+ startTime: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
if (!iteratorForRealTimeMode.isDefined) {
logInfo(s"Getting a new kafka consuming iterator for
${offsetRange.topicPartition} " +
s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
index 5bed945432c9..1e939c14cdc9 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
@@ -78,9 +78,12 @@ public interface SupportsRealTimeRead<T> extends
PartitionReader<T> {
* Alternative function to be called than next(), that proceed to the next
record. The different
* from next() is that, if there is no more records, the call needs to
keep waiting until
* the timeout.
+ * @param startTime the base time (milliseconds) the was used to calculate
the timeout.
+ * Sources should use it as the reference time to start
waiting for the next
+ * record instead of getting the latest time from
LowLatencyClock.
* @param timeout if no result is available after this timeout
(milliseconds), return
* @return {@link RecordStatus} describing whether a record is available
and its arrival time
* @throws IOException
*/
- RecordStatus nextWithTimeout(Long timeout) throws IOException;
+ RecordStatus nextWithTimeout(Long startTime, Long timeout) throws
IOException;
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
index 3432f28e12cc..9fff2d91af14 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
@@ -66,7 +66,7 @@ case class LowLatencyReaderWrap(
val ret = if (curTime >= lowLatencyEndTime) {
RecordStatus.newStatusWithoutArrivalTime(false)
} else {
- reader.nextWithTimeout(lowLatencyEndTime - curTime)
+ reader.nextWithTimeout(curTime, lowLatencyEndTime - curTime)
}
if (!ret.hasRecord) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
index 0e34b46a56c5..bc8f51c95861 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
@@ -277,8 +277,12 @@ class LowLatencyMemoryStreamPartitionReader(
if (TaskContext.get() == null) {
throw new IllegalStateException("Task context was not set!")
}
- override def nextWithTimeout(timeout: java.lang.Long): RecordStatus = {
- val startReadTime = clock.nanoTime()
+ override def nextWithTimeout(
+ startTime: java.lang.Long, timeout: java.lang.Long): RecordStatus = {
+ // SPARK-55699: Use the reference time passed in by the caller instead of
getting the latest
+ // time from LowLatencyClock, to avoid inconsistent reading when
LowLatencyClock is a
+ // manual clock.
+ val startReadTime = startTime
var elapsedTimeMs = 0L
current = getRecordWithTimestamp
while (current.isEmpty) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]