This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 848e47a5d766 [SPARK-53737][SQL][SS] Add Real-time Mode trigger
848e47a5d766 is described below

commit 848e47a5d766b22ac28cd9e926fce2fe4f11de05
Author: Jerry Peng <[email protected]>
AuthorDate: Wed Oct 1 17:20:14 2025 -0700

    [SPARK-53737][SQL][SS] Add Real-time Mode trigger
    
    ### What changes were proposed in this pull request?
    
    Introduce a new trigger type for Real-time Mode (RTM) in Structured 
Streaming.  This new trigger will be how users enable their Structured 
Streaming query to run in Real-time Mode.
    
    Please note this PR just adds the trigger.  Users cannot yet run queries in 
Real-time Mode. Other functionality will come in later PRs.
    
    ### Why are the changes needed?
    
    This serves as the first PR to add Real-time mode to Structured Streaming.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it adds a new trigger type to Structured Streaming.  This change does 
not effect or change any existing behaviors.
    
    ### How was this patch tested?
    
    Unit test added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52473 from jerrypeng/SPARK-53737.
    
    Authored-by: Jerry Peng <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 .../org/apache/spark/sql/streaming/Trigger.java    |  54 +++++++++++
 .../spark/sql/execution/streaming/Triggers.scala   |  42 ++++++++-
 .../sql/streaming/StreamRealTimeModeSuite.scala    | 101 +++++++++++++++++++++
 3 files changed, 196 insertions(+), 1 deletion(-)

diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java 
b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
index 6e3a93ba9454..8536df1ec74f 100644
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -22,10 +22,12 @@ import java.util.concurrent.TimeUnit;
 import scala.concurrent.duration.Duration;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.annotation.Experimental;
 import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
 import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
 import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
 import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger;
 
 /**
  * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
@@ -176,4 +178,56 @@ public class Trigger {
   public static Trigger Continuous(String interval) {
     return ContinuousTrigger.apply(interval);
   }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   */
+  @Experimental
+  public static Trigger RealTime(long batchDurationMs) {
+    return RealTimeTrigger.apply(batchDurationMs);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   */
+  @Experimental
+  public static Trigger RealTime(long batchDuration, TimeUnit timeUnit) {
+    return RealTimeTrigger.create(batchDuration, timeUnit);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(Trigger.RealTime(10.seconds))
+   * }}}
+   */
+  @Experimental
+  public static Trigger RealTime(Duration batchDuration) {
+    return RealTimeTrigger.apply(batchDuration);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.RealTime("10 seconds"))
+   * }}}
+   */
+  @Experimental
+  public static Trigger RealTime(String batchDuration) {
+    return RealTimeTrigger.apply(batchDuration);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration. The 
default duration is 5
+   * minutes.
+   */
+  @Experimental
+  public static Trigger RealTime() {
+    return RealTimeTrigger.apply();
+  }
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
index 7d8b33aa5e22..ea7100fcc23a 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -19,9 +19,12 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.TimeUnit
 
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, MINUTES}
+
+import org.json4s.DefaultFormats
 
 import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
 import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.microsToMillis
 import org.apache.spark.sql.catalyst.util.SparkIntervalUtils
@@ -114,3 +117,40 @@ object ContinuousTrigger {
     ContinuousTrigger(convert(interval, unit))
   }
 }
+
+/**
+ * A [[Trigger]] that runs a query in real time mode.
+ * @param batchDurationMs
+ *   The duration of each batch in milliseconds. This must be strictly 
positive.
+ */
+@Experimental
+case class RealTimeTrigger(batchDurationMs: Long) extends Trigger {
+  require(batchDurationMs > 0, "the batch duration should not be negative")
+
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+}
+
+@Experimental
+object RealTimeTrigger {
+  import Triggers._
+
+  def apply(): RealTimeTrigger = {
+    RealTimeTrigger(Duration(5, MINUTES))
+  }
+
+  def apply(batchDuration: String): RealTimeTrigger = {
+    RealTimeTrigger(convert(batchDuration))
+  }
+
+  def apply(batchDuration: Duration): RealTimeTrigger = {
+    RealTimeTrigger(convert(batchDuration))
+  }
+
+  def create(batchDuration: String): RealTimeTrigger = {
+    apply(batchDuration)
+  }
+
+  def create(batchDuration: Long, unit: TimeUnit): RealTimeTrigger = {
+    RealTimeTrigger(convert(batchDuration, unit))
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
new file mode 100644
index 000000000000..e21355430213
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+
+class StreamRealTimeModeSuite extends StreamTest {
+
+  test("test trigger") {
+    def testTrigger(trigger: Trigger, actual: Long): Unit = {
+      val realTimeTrigger = trigger.asInstanceOf[RealTimeTrigger]
+      assert(
+        realTimeTrigger.batchDurationMs == actual,
+        s"Real time trigger duration should be ${actual} ms" +
+        s" but got ${realTimeTrigger.batchDurationMs} ms"
+      )
+    }
+
+    // test default
+    testTrigger(Trigger.RealTime(), 300000)
+
+    List(
+      ("1 second", 1000),
+      ("1 minute", 60000),
+      ("1 hour", 3600000),
+      ("1 day", 86400000),
+      ("1 week", 604800000)
+    ).foreach {
+      case (str, ms) =>
+        testTrigger(Trigger.RealTime(str), ms)
+        testTrigger(RealTimeTrigger(str), ms)
+        testTrigger(RealTimeTrigger.create(str), ms)
+
+    }
+
+    List(1000, 60000, 3600000, 86400000, 604800000).foreach { ms =>
+      testTrigger(Trigger.RealTime(ms), ms)
+      testTrigger(RealTimeTrigger(ms), ms)
+      testTrigger(new RealTimeTrigger(ms), ms)
+    }
+
+    List(
+      (Duration.apply(1000, "ms"), 1000),
+      (Duration.apply(60, "s"), 60000),
+      (Duration.apply(1, "h"), 3600000),
+      (Duration.apply(1, "d"), 86400000)
+    ).foreach {
+      case (duration, ms) =>
+        testTrigger(Trigger.RealTime(duration), ms)
+        testTrigger(RealTimeTrigger(duration), ms)
+        testTrigger(RealTimeTrigger(duration), ms)
+    }
+
+    List(
+      (1000, TimeUnit.MILLISECONDS, 1000),
+      (60, TimeUnit.SECONDS, 60000),
+      (1, TimeUnit.HOURS, 3600000),
+      (1, TimeUnit.DAYS, 86400000)
+    ).foreach {
+      case (interval, unit, ms) =>
+        testTrigger(Trigger.RealTime(interval, unit), ms)
+        testTrigger(RealTimeTrigger(interval, unit), ms)
+        testTrigger(RealTimeTrigger.create(interval, unit), ms)
+    }
+    // test invalid
+    List("-1", "0").foreach(
+      str =>
+        intercept[IllegalArgumentException] {
+          testTrigger(Trigger.RealTime(str), -1)
+          testTrigger(RealTimeTrigger.create(str), -1)
+        }
+    )
+
+    List(-1, 0).foreach(
+      duration =>
+        intercept[IllegalArgumentException] {
+          testTrigger(Trigger.RealTime(duration), -1)
+          testTrigger(RealTimeTrigger(duration), -1)
+        }
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to