This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c9456f6a7c6 [SPARK-55699][SS][FOLLOWUP] Inconsistent reading of 
LowLatencyClock when used together with ManualClock
7c9456f6a7c6 is described below

commit 7c9456f6a7c60cf878078fa589efea839b908515
Author: Yuchen Liu <[email protected]>
AuthorDate: Mon Mar 2 07:18:49 2026 +0900

    [SPARK-55699][SS][FOLLOWUP] Inconsistent reading of LowLatencyClock when 
used together with ManualClock
    
    ### What changes were proposed in this pull request?
    
    In the previous [PR](https://github.com/apache/spark/pull/54497), we update 
the signature of `nextWithTimeout` in `SupportsRealTimeRead`. However, there 
was a bug introduced in `LowLatencyMemoryStream` where we compared millisecond 
timestamp and nanosecond timestamp directly without conversion.
    
    This PR fixes this issue, and renamed the parameter to better prevent such 
issue from happening.
    
    ### Why are the changes needed?
    
    **Context of the previous PR:** There was an issue that RTM tests that use 
manual clock might stuck because manual clock advancement may happens right in 
between `LowLatencyReaderWrap` getting the reference time, and 
`nextWithTimeout` in each source getting the start time. `nextWithTimeout` may 
uses the already advanced time to time its wait. When this happens, since the 
manual clock may only be advanced once by the test, `nextWithTimeout` may never 
return, causing test timeout. This  [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54550 from eason-yuchen-liu/fixLowLatencyClockInconsistency2.
    
    Authored-by: Yuchen Liu 
<[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala  | 2 +-
 .../spark/sql/connector/read/streaming/SupportsRealTimeRead.java   | 6 +++---
 .../sql/execution/streaming/sources/LowLatencyMemoryStream.scala   | 7 +++----
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 77ebcb04f2f7..36aaf3e76ff9 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -105,7 +105,7 @@ private case class KafkaBatchPartitionReader(
   }
 
   override def nextWithTimeout(
-      startTime: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
+      startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
     if (!iteratorForRealTimeMode.isDefined) {
       logInfo(s"Getting a new kafka consuming iterator for 
${offsetRange.topicPartition} " +
         s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
index 1e939c14cdc9..5542781f333a 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
@@ -78,12 +78,12 @@ public interface SupportsRealTimeRead<T> extends 
PartitionReader<T> {
      * Alternative function to be called than next(), that proceed to the next 
record. The different
      * from next() is that, if there is no more records, the call needs to 
keep waiting until
      * the timeout.
-     * @param startTime the base time (milliseconds) the was used to calculate 
the timeout.
+     * @param startTimeMs the base time (milliseconds) the was used to 
calculate the timeout.
      *                  Sources should use it as the reference time to start 
waiting for the next
      *                  record instead of getting the latest time from 
LowLatencyClock.
-     * @param timeout if no result is available after this timeout 
(milliseconds), return
+     * @param timeoutMs if no result is available after this timeout 
(milliseconds), return
      * @return {@link RecordStatus} describing whether a record is available 
and its arrival time
      * @throws IOException
      */
-    RecordStatus nextWithTimeout(Long startTime, Long timeout) throws 
IOException;
+    RecordStatus nextWithTimeout(Long startTimeMs, Long timeoutMs) throws 
IOException;
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
index bc8f51c95861..50bff9735f62 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
@@ -278,21 +278,20 @@ class LowLatencyMemoryStreamPartitionReader(
     throw new IllegalStateException("Task context was not set!")
   }
   override def nextWithTimeout(
-      startTime: java.lang.Long, timeout: java.lang.Long): RecordStatus = {
+      startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
     // SPARK-55699: Use the reference time passed in by the caller instead of 
getting the latest
     // time from LowLatencyClock, to avoid inconsistent reading when 
LowLatencyClock is a
     // manual clock.
-    val startReadTime = startTime
     var elapsedTimeMs = 0L
     current = getRecordWithTimestamp
     while (current.isEmpty) {
       val POLL_TIME = 10L
-      if (elapsedTimeMs >= timeout) {
+      if (elapsedTimeMs >= timeoutMs) {
         return RecordStatus.newStatusWithoutArrivalTime(false)
       }
       Thread.sleep(POLL_TIME)
       current = getRecordWithTimestamp
-      elapsedTimeMs = (clock.nanoTime() - startReadTime) / 1000 / 1000
+      elapsedTimeMs = clock.getTimeMillis() - startTimeMs
     }
     currentOffset += 1
     RecordStatus.newStatusWithArrivalTimeMs(current.get._2)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to