This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8026f7d22248 [SPARK-55699][SS] Inconsistent reading of LowLatencyClock 
when used together with ManualClock
8026f7d22248 is described below

commit 8026f7d222489a412bcb781f49d414302b593a9b
Author: Yuchen Liu <[email protected]>
AuthorDate: Fri Feb 27 13:21:03 2026 +0900

    [SPARK-55699][SS] Inconsistent reading of LowLatencyClock when used 
together with ManualClock
    
    ### What changes were proposed in this pull request?
    
    This PR updates the signature of `nextWithTimeout` in Streaming Real-Time 
Mode sources. We add a new parameter `startTime`, which is used to pass the 
start time used by `LowLatencyReaderWrap` to the each source. This ensures that 
the starting time used by the streaming engine and the source connector are the 
same.
    
    ### Why are the changes needed?
    
    There was an issue that RTM tests that use manual clock might stuck because 
manual clock advancement may happens right in between `LowLatencyReaderWrap` 
getting the reference time, and `nextWithTimeout` in each source getting the 
start time. `nextWithTimeout` may uses the already advanced time to time its 
wait. When this happens, since the manual clock may only be advanced once by 
the test, `nextWithTimeout` may never return, causing test timeout. This only 
affects LowLatencyMemoryStr [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54497 from eason-yuchen-liu/fixLowLatencyClockInconsistency.
    
    Authored-by: Yuchen Liu 
<[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala | 3 ++-
 .../spark/sql/connector/read/streaming/SupportsRealTimeRead.java  | 5 ++++-
 .../sql/execution/datasources/v2/RealTimeStreamScanExec.scala     | 2 +-
 .../sql/execution/streaming/sources/LowLatencyMemoryStream.scala  | 8 ++++++--
 4 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 9fcdf1a7d9bf..77ebcb04f2f7 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -104,7 +104,8 @@ private case class KafkaBatchPartitionReader(
     }
   }
 
-  override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
+  override def nextWithTimeout(
+      startTime: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
     if (!iteratorForRealTimeMode.isDefined) {
       logInfo(s"Getting a new kafka consuming iterator for 
${offsetRange.topicPartition} " +
         s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
index 5bed945432c9..1e939c14cdc9 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
@@ -78,9 +78,12 @@ public interface SupportsRealTimeRead<T> extends 
PartitionReader<T> {
      * Alternative function to be called than next(), that proceed to the next 
record. The different
      * from next() is that, if there is no more records, the call needs to 
keep waiting until
      * the timeout.
+     * @param startTime the base time (milliseconds) the was used to calculate 
the timeout.
+     *                  Sources should use it as the reference time to start 
waiting for the next
+     *                  record instead of getting the latest time from 
LowLatencyClock.
      * @param timeout if no result is available after this timeout 
(milliseconds), return
      * @return {@link RecordStatus} describing whether a record is available 
and its arrival time
      * @throws IOException
      */
-    RecordStatus nextWithTimeout(Long timeout) throws IOException;
+    RecordStatus nextWithTimeout(Long startTime, Long timeout) throws 
IOException;
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
index 3432f28e12cc..9fff2d91af14 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
@@ -66,7 +66,7 @@ case class LowLatencyReaderWrap(
     val ret = if (curTime >= lowLatencyEndTime) {
       RecordStatus.newStatusWithoutArrivalTime(false)
     } else {
-      reader.nextWithTimeout(lowLatencyEndTime - curTime)
+      reader.nextWithTimeout(curTime, lowLatencyEndTime - curTime)
     }
 
     if (!ret.hasRecord) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
index 0e34b46a56c5..bc8f51c95861 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
@@ -277,8 +277,12 @@ class LowLatencyMemoryStreamPartitionReader(
   if (TaskContext.get() == null) {
     throw new IllegalStateException("Task context was not set!")
   }
-  override def nextWithTimeout(timeout: java.lang.Long): RecordStatus = {
-    val startReadTime = clock.nanoTime()
+  override def nextWithTimeout(
+      startTime: java.lang.Long, timeout: java.lang.Long): RecordStatus = {
+    // SPARK-55699: Use the reference time passed in by the caller instead of 
getting the latest
+    // time from LowLatencyClock, to avoid inconsistent reading when 
LowLatencyClock is a
+    // manual clock.
+    val startReadTime = startTime
     var elapsedTimeMs = 0L
     current = getRecordWithTimestamp
     while (current.isEmpty) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to