This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c3dacb6497b [SPARK-53207][SDP] Send Pipeline Event to Client 
Asynchronously
0c3dacb6497b is described below

commit 0c3dacb6497b098f4a1fc38d4574df44102fc02a
Author: Jacky Wang <jacky.w...@databricks.com>
AuthorDate: Tue Sep 2 22:27:35 2025 -0700

    [SPARK-53207][SDP] Send Pipeline Event to Client Asynchronously
    
    ### What changes were proposed in this pull request?
    
    Created `PipelineEventSender` to allow sending pipeline event back to 
client in a background thread to not block pipeline execution. New events gets 
queued into the sender and will get processed sequentially. The sender waits 
until all events are sent back to client before shutting down the processing 
loop.
    
    Implemented logical capacity checking that examines event types before 
submission, ensuring RunProgress and terminal FlowProgress events are always 
queued while other events may be dropped when the queue is full. This prevents 
buffer from overflowing and impact pipeline execution when the number of events 
are too large.
    
    The queue size can be controlled by a spark conf 
`spark.sql.connect.pipeline.event.queue.capacity`, currently defaulted to `1000`
    
    ### Why are the changes needed?
    
    To ensure event sending do not block pipeline execution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New and existing tests to ensure events are delivered and not done in a 
blocking way.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51956 from JiaqiWang18/SPARK-53207-async-event-response.
    
    Lead-authored-by: Jacky Wang <jacky.w...@databricks.com>
    Co-authored-by: Jacky Wang <jacky200...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  10 +
 .../connect/pipelines/PipelineEventSender.scala    | 188 +++++++++++++++++
 .../sql/connect/pipelines/PipelinesHandler.scala   | 106 ++++------
 .../pipelines/PipelineEventSenderSuite.scala       | 231 +++++++++++++++++++++
 .../spark/sql/pipelines/common/GraphStates.scala   |   6 +
 5 files changed, 473 insertions(+), 68 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 86807914c236..48bbe09f1643 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -6207,6 +6207,16 @@ object SQLConf {
       .createWithDefault(2)
   }
 
+  val PIPELINES_EVENT_QUEUE_CAPACITY = {
+    buildConf("spark.sql.pipelines.event.queue.capacity")
+      .doc("Capacity of the event queue used in pipelined execution. When the 
queue is full, " +
+        "non-terminal FlowProgressEvents will be dropped.")
+      .version("4.1.0")
+      .intConf
+      .checkValue(v => v > 0, "Event queue capacity must be positive.")
+      .createWithDefault(1000)
+  }
+
   val HADOOP_LINE_RECORD_READER_ENABLED =
     buildConf("spark.sql.execution.datasources.hadoopLineRecordReader.enabled")
       .internal()
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
new file mode 100644
index 000000000000..4f263ebe372c
--- /dev/null
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.pipelines
+
+import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.{Timestamp => ProtoTimestamp}
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.sql.connect.service.SessionHolder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.common.FlowStatus
+import org.apache.spark.sql.pipelines.logging.{FlowProgress, PipelineEvent, 
RunProgress}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Handles sending pipeline events to the client in a background thread. This 
prevents pipeline
+ * execution from blocking on streaming events.
+ */
+class PipelineEventSender(
+    responseObserver: StreamObserver[ExecutePlanResponse],
+    sessionHolder: SessionHolder)
+    extends Logging
+    with AutoCloseable {
+
+  private final val queueCapacity: Int =
+    sessionHolder.session.conf
+      .get(SQLConf.PIPELINES_EVENT_QUEUE_CAPACITY.key)
+      .toInt
+
+  // ExecutorService for background event processing
+  private val executor: ThreadPoolExecutor =
+    ThreadUtils.newDaemonSingleThreadExecutor(threadName =
+      s"PipelineEventSender-${sessionHolder.sessionId}")
+
+  /*
+   * Atomic flags to track the state of the sender
+   * - `isShutdown`: Indicates if the sender has been shut down, if true, no 
new events
+   *    can be accepted, and the executor will be shut down after processing 
all submitted events.
+   */
+  private val isShutdown = new AtomicBoolean(false)
+
+  /**
+   * Send an event async by submitting it to the executor, if the sender is 
not shut down.
+   * Otherwise, throws an IllegalStateException, to raise awareness of the 
shutdown state.
+   *
+   * For RunProgress events, we ensure they are always queued even if the 
queue is full. For other
+   * events, we may drop them if the queue is at capacity to prevent blocking.
+   */
+  def sendEvent(event: PipelineEvent): Unit = synchronized {
+    if (!isShutdown.get()) {
+      if (shouldEnqueueEvent(event)) {
+        executor.submit(new Runnable {
+          override def run(): Unit = {
+            try {
+              sendEventToClient(event)
+            } catch {
+              case NonFatal(e) =>
+                logError(
+                  log"Failed to send pipeline event to client: " +
+                    log"${MDC(LogKeys.ERROR, event.message)}",
+                  e)
+            }
+          }
+        })
+      }
+    } else {
+      throw new IllegalStateException(
+        s"Cannot send event after shutdown for session 
${sessionHolder.sessionId}")
+    }
+  }
+
+  private def shouldEnqueueEvent(event: PipelineEvent): Boolean = {
+    event.details match {
+      case _: RunProgress =>
+        // For RunProgress events, always enqueue event
+        true
+      case flowProgress: FlowProgress if 
FlowStatus.isTerminal(flowProgress.status) =>
+        // For FlowProgress events that are terminal, always enqueue event
+        true
+      case _ =>
+        // For other events, check if we have capacity
+        executor.getQueue.size() < queueCapacity
+    }
+  }
+
+  // Implementing AutoCloseable to allow for try-with-resources usage
+  // This will ensure that the sender is properly shut down and all resources 
are released
+  // without requiring explicit shutdown calls in user code.
+  override def close(): Unit = shutdown()
+
+  /**
+   * Shutdown the event sender, stop taking new events and wait for processing 
to complete. This
+   * method blocks until all queued events have been processed. Idempotent 
operation: calling this
+   * multiple times has no effect after the first call.
+   */
+  def shutdown(): Unit = {
+    if (isShutdown.compareAndSet(false, true)) {
+      // Request a shutdown of the executor which waits for all tasks to 
complete
+      executor.shutdown()
+      // Blocks until all tasks have completed execution after a shutdown 
request,
+      // disregard the timeout since we want all events to be processed
+      if (!executor.awaitTermination(Long.MaxValue, 
java.util.concurrent.TimeUnit.MILLISECONDS)) {
+        logError(
+          log"Pipeline event sender for session " +
+            log"${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)} failed to 
terminate")
+        executor.shutdownNow()
+      }
+      logInfo(
+        log"Pipeline event sender shutdown completed for session " +
+          log"${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}")
+    }
+  }
+
+  /**
+   * Send a single event to the client
+   */
+  private[connect] def sendEventToClient(event: PipelineEvent): Unit = {
+    try {
+      val protoEvent = constructProtoEvent(event)
+      responseObserver.onNext(
+        proto.ExecutePlanResponse
+          .newBuilder()
+          .setSessionId(sessionHolder.sessionId)
+          .setServerSideSessionId(sessionHolder.serverSessionId)
+          .setPipelineEventResult(proto.PipelineEventResult.newBuilder
+            .setEvent(protoEvent)
+            .build())
+          .build())
+    } catch {
+      case NonFatal(e) =>
+        logError(
+          log"Failed to send pipeline event to client: " +
+            log"${MDC(LogKeys.ERROR, event.message)}",
+          e)
+    }
+  }
+
+  private def constructProtoEvent(event: PipelineEvent): proto.PipelineEvent = 
{
+    val message = if (event.error.nonEmpty) {
+      // Returns the message associated with a Throwable and all its causes
+      def getExceptionMessages(throwable: Throwable): Seq[String] = {
+        throwable.getMessage +:
+          Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
+      }
+      val errorMessages = getExceptionMessages(event.error.get)
+      s"""${event.message}
+         |Error: ${errorMessages.mkString("\n")}""".stripMargin
+    } else {
+      event.message
+    }
+    val protoEventBuilder = proto.PipelineEvent
+      .newBuilder()
+      .setTimestamp(
+        ProtoTimestamp
+          .newBuilder()
+          // java.sql.Timestamp normalizes its internal fields: getTime() 
returns
+          // the full timestamp in milliseconds, while getNanos() returns the
+          // fractional seconds (0-999,999,999 ns). This ensures no precision 
is
+          // lost or double-counted.
+          .setSeconds(event.timestamp.getTime / 1000)
+          .setNanos(event.timestamp.getNanos)
+          .build())
+      .setMessage(message)
+    protoEventBuilder.build()
+  }
+}
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index b595ba2c501e..f35aee6a16eb 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.connect.pipelines
 
 import scala.jdk.CollectionConverters._
+import scala.util.Using
 
-import com.google.protobuf.{Timestamp => ProtoTimestamp}
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto
@@ -237,77 +237,47 @@ private[connect] object PipelinesHandler extends Logging {
       
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
     val tableFiltersResult = createTableFilters(cmd, graphElementRegistry, 
sessionHolder)
 
-    // We will use this variable to store the run failure event if it occurs. 
This will be set
-    // by the event callback.
-    @volatile var runFailureEvent = Option.empty[PipelineEvent]
-    // Define a callback which will stream logs back to the SparkConnect 
client when an internal
-    // pipeline event is emitted during pipeline execution. We choose to pass 
a callback rather the
-    // responseObserver to the pipelines execution code so that the pipelines 
module does not need
-    // to take a dependency on SparkConnect.
-    val eventCallback = { event: PipelineEvent =>
-      val message = if (event.error.nonEmpty) {
-        // Returns the message associated with a Throwable and all its causes
-        def getExceptionMessages(throwable: Throwable): Seq[String] = {
-          throwable.getMessage +:
-            Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
+    // Use the PipelineEventSender to send events back to the client 
asynchronously.
+    Using.resource(new PipelineEventSender(responseObserver, sessionHolder)) { 
eventSender =>
+      // We will use this variable to store the run failure event if it 
occurs. This will be set
+      // by the event callback.
+      @volatile var runFailureEvent = Option.empty[PipelineEvent]
+      // Define a callback which will stream logs back to the SparkConnect 
client when an internal
+      // pipeline event is emitted during pipeline execution. We choose to 
pass a callback rather
+      // the responseObserver to the pipelines execution code so that the 
pipelines module does not
+      // need to take a dependency on SparkConnect.
+      val eventCallback = { event: PipelineEvent =>
+        event.details match {
+          // Failed runs are recorded in the event log. We do not pass these 
to the SparkConnect
+          // client since the failed run will already result in an unhandled 
exception that is
+          // propagated to the SparkConnect client. This special handling 
ensures that the client
+          // does not see the same error twice for a failed run.
+          case RunProgress(state) if state == FAILED => runFailureEvent = 
Some(event)
+          case RunProgress(state) if state == CANCELED =>
+            throw new RuntimeException("Pipeline run was canceled.")
+          case _ =>
+            eventSender.sendEvent(event)
         }
-        val errorMessages = getExceptionMessages(event.error.get)
-        s"""${event.message}
-           |Error: ${errorMessages.mkString("\n")}""".stripMargin
-      } else {
-        event.message
       }
-      event.details match {
-        // Failed runs are recorded in the event log. We do not pass these to 
the SparkConnect
-        // client since the failed run will already result in an unhandled 
exception that is
-        // propagated to the SparkConnect client. This special handling 
ensures that the client
-        // does not see the same error twice for a failed run.
-        case RunProgress(state) if state == FAILED => runFailureEvent = 
Some(event)
-        case RunProgress(state) if state == CANCELED =>
-          throw new RuntimeException("Pipeline run was canceled.")
-        case _ =>
-          responseObserver.onNext(
-            proto.ExecutePlanResponse
-              .newBuilder()
-              .setSessionId(sessionHolder.sessionId)
-              .setServerSideSessionId(sessionHolder.serverSessionId)
-              .setPipelineEventResult(
-                proto.PipelineEventResult.newBuilder
-                  .setEvent(
-                    proto.PipelineEvent
-                      .newBuilder()
-                      .setTimestamp(
-                        ProtoTimestamp
-                          .newBuilder()
-                          // java.sql.Timestamp normalizes its internal 
fields: getTime() returns
-                          // the full timestamp in milliseconds, while 
getNanos() returns the
-                          // fractional seconds (0-999,999,999 ns). This 
ensures no precision is
-                          // lost or double-counted.
-                          .setSeconds(event.timestamp.getTime / 1000)
-                          .setNanos(event.timestamp.getNanos)
-                          .build())
-                      .setMessage(message)
-                      .build())
-                  .build())
-              .build())
+
+      val pipelineUpdateContext = new PipelineUpdateContextImpl(
+        graphElementRegistry.toDataflowGraph,
+        eventCallback,
+        tableFiltersResult.refresh,
+        tableFiltersResult.fullRefresh)
+      sessionHolder.cachePipelineExecution(dataflowGraphId, 
pipelineUpdateContext)
+
+      if (cmd.getDry) {
+        pipelineUpdateContext.pipelineExecution.dryRunPipeline()
+      } else {
+        pipelineUpdateContext.pipelineExecution.runPipeline()
       }
-    }
-    val pipelineUpdateContext = new PipelineUpdateContextImpl(
-      graphElementRegistry.toDataflowGraph,
-      eventCallback,
-      tableFiltersResult.refresh,
-      tableFiltersResult.fullRefresh)
-    sessionHolder.cachePipelineExecution(dataflowGraphId, 
pipelineUpdateContext)
-    if (cmd.getDry) {
-      pipelineUpdateContext.pipelineExecution.dryRunPipeline()
-    } else {
-      pipelineUpdateContext.pipelineExecution.runPipeline()
-    }
 
-    // Rethrow any exceptions that caused the pipeline run to fail so that the 
exception is
-    // propagated back to the SC client / CLI.
-    runFailureEvent.foreach { event =>
-      throw event.error.get
+      // Rethrow any exceptions that caused the pipeline run to fail so that 
the exception is
+      // propagated back to the SC client / CLI.
+      runFailureEvent.foreach { event =>
+        throw event.error.get
+      }
     }
   }
 
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
new file mode 100644
index 000000000000..fc3cce0f7459
--- /dev/null
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.pipelines
+
+import java.sql.Timestamp
+
+import io.grpc.stub.StreamObserver
+import org.mockito.{ArgumentCaptor, Mockito}
+import org.mockito.Mockito._
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.sql.classic.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.connect.service.SessionHolder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.common.FlowStatus
+import org.apache.spark.sql.pipelines.common.RunState.{COMPLETED, RUNNING}
+import org.apache.spark.sql.pipelines.logging.{EventDetails, EventLevel, 
FlowProgress, PipelineEvent, PipelineEventOrigin, RunProgress}
+
+class PipelineEventSenderSuite extends SparkDeclarativePipelinesServerTest 
with MockitoSugar {
+
+  private def createMockSetup(
+      queueSize: String = "1000"): (StreamObserver[ExecutePlanResponse], 
SessionHolder) = {
+    val mockObserver = mock[StreamObserver[ExecutePlanResponse]]
+    val mockSessionHolder = mock[SessionHolder]
+    when(mockSessionHolder.sessionId).thenReturn("test-session-id")
+    
when(mockSessionHolder.serverSessionId).thenReturn("test-server-session-id")
+    val mockSession = mock[SparkSession]
+    val mockConf = mock[RuntimeConfig]
+    when(mockSessionHolder.session).thenReturn(mockSession)
+    when(mockSession.conf).thenReturn(mockConf)
+    when(mockConf.get(SQLConf.PIPELINES_EVENT_QUEUE_CAPACITY.key))
+      .thenReturn(queueSize)
+    (mockObserver, mockSessionHolder)
+  }
+
+  private def createTestEvent(
+      id: String = "test-event-id",
+      message: String = "Test message",
+      level: EventLevel = EventLevel.INFO,
+      details: EventDetails = FlowProgress(FlowStatus.RUNNING)): PipelineEvent 
= {
+    PipelineEvent(
+      id = id,
+      timestamp = new Timestamp(System.currentTimeMillis()),
+      origin = PipelineEventOrigin(
+        flowName = Some("test-flow"),
+        datasetName = None,
+        sourceCodeLocation = None),
+      level = level,
+      message = message,
+      details = details,
+      error = None)
+  }
+
+  test("PipelineEventSender sends events") {
+    val (mockObserver, mockSessionHolder) = createMockSetup()
+
+    val eventSender = new PipelineEventSender(mockObserver, mockSessionHolder)
+
+    try {
+      val testEvent = createTestEvent()
+      eventSender.sendEvent(testEvent)
+
+      // Verify that onNext was called on the observer
+      val responseCaptor = 
ArgumentCaptor.forClass(classOf[ExecutePlanResponse])
+      verify(mockObserver, 
Mockito.timeout(1000)).onNext(responseCaptor.capture())
+
+      val response = responseCaptor.getValue
+      assert(response.getSessionId == "test-session-id")
+      assert(response.getServerSideSessionId == "test-server-session-id")
+      assert(response.hasPipelineEventResult)
+
+      val pipelineEvent = response.getPipelineEventResult.getEvent
+      assert(pipelineEvent.getMessage == "Test message")
+    } finally {
+      eventSender.shutdown()
+    }
+  }
+
+  test("PipelineEventSender graceful shutdown waits for previously queued 
events to process") {
+    val (mockObserver, mockSessionHolder) = createMockSetup()
+
+    val eventSender = new PipelineEventSender(mockObserver, mockSessionHolder)
+
+    val events = Seq(1, 2).map { i =>
+      createTestEvent(
+        id = s"event-$i",
+        message = s"Event $i before shutdown",
+        level = EventLevel.INFO,
+        details = FlowProgress(FlowStatus.RUNNING))
+    }
+
+    events.foreach(event => eventSender.sendEvent(event))
+
+    eventSender.shutdown() // blocks until all events are processed
+
+    // Event should have been processed before shutdown completed
+    val responseCaptor = ArgumentCaptor.forClass(classOf[ExecutePlanResponse])
+    verify(mockObserver, times(2)).onNext(responseCaptor.capture())
+
+    val responses = responseCaptor.getAllValues
+    assert(responses.size == 2)
+    assert(
+      responses.get(0).getPipelineEventResult.getEvent.getMessage == "Event 1 
before shutdown")
+    assert(
+      responses.get(1).getPipelineEventResult.getEvent.getMessage == "Event 2 
before shutdown")
+  }
+
+  test("PipelineEventSender throws exception on send after shutdown") {
+    val (mockObserver, mockSessionHolder) = createMockSetup()
+
+    val eventSender = new PipelineEventSender(mockObserver, mockSessionHolder)
+    eventSender.shutdown()
+    intercept[IllegalStateException] {
+      eventSender.sendEvent(createTestEvent())
+    }
+  }
+
+  test("PipelineEventSender drops events after reaching capacity") {
+    // This test simulates a scenario where the event queue is full and 
verifies that
+    // events are dropped when the queue is at capacity, except for terminal 
FlowProgress events and
+    // RunProgress events which should always be queued.
+
+    // Start with queue size of 1 to test capacity handling
+    val (mockObserver, mockSessionHolder) = createMockSetup(queueSize = "1")
+
+    val eventSender = new PipelineEventSender(mockObserver, mockSessionHolder) 
{
+      override def sendEventToClient(event: PipelineEvent): Unit = {
+        Thread.sleep(2000) // Simulate processing time so that we can test 
queue capacity
+        super.sendEventToClient(event)
+      }
+    }
+    try {
+      // Send FlowProgress.RUNNING event - should be sent
+      val startedEvent = createTestEvent(
+        id = "startedEvent",
+        message = "Flow a started",
+        details = FlowProgress(FlowStatus.STARTING))
+      eventSender.sendEvent(startedEvent)
+
+      // Send FlowProgress.RUNNING event - should be queued
+      val firstRunningEvent = createTestEvent(
+        id = "firstRunningEvent",
+        message = "Flow a running",
+        details = FlowProgress(FlowStatus.RUNNING))
+      eventSender.sendEvent(firstRunningEvent)
+
+      // Send FlowProgress.RUNNING event - should be discarded due to full 
queue
+      val secondRunningEvent = createTestEvent(
+        id = "secondRunningEvent",
+        message = "Flow a running",
+        details = FlowProgress(FlowStatus.RUNNING))
+      eventSender.sendEvent(secondRunningEvent)
+
+      // Send RunProgress.RUNNING event - should be queued and processed
+      val runProgressRunningEvent = createTestEvent(
+        id = "runProgressRunning",
+        message = "Update completed",
+        details = RunProgress(RUNNING))
+      eventSender.sendEvent(runProgressRunningEvent)
+
+      // Send FlowProgress.RUNNING event - should be discarded due to full 
queue
+      val thirdRunningEvent = createTestEvent(
+        id = "thirdRunningEvent",
+        message = "Flow a running",
+        details = FlowProgress(FlowStatus.RUNNING))
+      eventSender.sendEvent(thirdRunningEvent)
+
+      // Send FlowProgress.COMPLETED event - should be queued and processed
+      val completedEvent = createTestEvent(
+        id = "completed",
+        message = "Flow has completed",
+        details = FlowProgress(FlowStatus.COMPLETED))
+      eventSender.sendEvent(completedEvent)
+
+      // Send RunProgress.COMPLETED event - should be queued and processed
+      val runProgressCompletedEvent = createTestEvent(
+        id = "runProgressCompletedEvent",
+        message = "Update completed",
+        details = RunProgress(COMPLETED))
+      eventSender.sendEvent(runProgressCompletedEvent)
+
+      // Shutdown to ensure all queued events are processed
+      eventSender.shutdown()
+
+      val responseCaptor = 
ArgumentCaptor.forClass(classOf[ExecutePlanResponse])
+      verify(mockObserver, times(5)).onNext(responseCaptor.capture())
+      val responses = responseCaptor.getAllValues
+      assert(responses.size == 5)
+
+      // FlowProgress.STARTED should be processed immediately
+      assert(responses.get(0).getPipelineEventResult.getEvent.getMessage == 
startedEvent.message)
+
+      // First FlowProgress.RUNNING should be queued and processed
+      assert(
+        responses.get(1).getPipelineEventResult.getEvent.getMessage
+          == firstRunningEvent.message)
+
+      // RunProgress.RUNNING should be queued and processed
+      assert(
+        responses.get(2).getPipelineEventResult.getEvent.getMessage
+          == runProgressRunningEvent.message)
+
+      // FlowProgress.COMPLETED event should also be processed because it is 
terminal
+      assert(
+        responses.get(3).getPipelineEventResult.getEvent.getMessage == 
completedEvent.message)
+
+      // RunProgress.COMPLETED event should also be processed
+      assert(
+        responses.get(4).getPipelineEventResult.getEvent.getMessage
+          == runProgressCompletedEvent.message)
+    } finally {
+      eventSender.shutdown()
+    }
+  }
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/common/GraphStates.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/common/GraphStates.scala
index 6a4722ba8b3c..077453a5f5d3 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/common/GraphStates.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/common/GraphStates.scala
@@ -41,6 +41,12 @@ object FlowStatus {
   // This flow is idle because there are no updates to be made because all 
available data has
   // already been processed.
   case object IDLE extends FlowStatus
+
+  // check if the flow is in a terminal state
+  def isTerminal(status: FlowStatus): Boolean = status match {
+    case COMPLETED | FAILED | SKIPPED | STOPPED => true
+    case _ => false
+  }
 }
 
 sealed trait RunState


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to