This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 58ec60d26974 [SPARK-38498][DSTREAM] Support customized
StreamingListener by configuration
58ec60d26974 is described below
commit 58ec60d26974242245ec22de5e70ccf09942c42e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Nov 4 11:04:43 2025 -0800
[SPARK-38498][DSTREAM] Support customized StreamingListener by configuration
### What changes were proposed in this pull request?
Currently, if user want to add an customized StreamingListener to
StreamingContext, we need to add this listener in customized code.
```
streamingContext.addStreamingListener()
```
In this pr, we can define customized StreamingListener by set conf
```
spark.streaming.extraListeners
```
And it can support two constructor
1. No construct parameter
2. one constructor parameter of `SparkConf`
### Why are the changes needed?
Some time we want to add some common StreamingListener to do some
customized analysis, it is noisy to do this for all job, with this
configuration, we can do this together just by setting a common env.
### Does this PR introduce _any_ user-facing change?
User can set StreamingContext by set configuration
```
spark.streaming.extraListeners
```
### How was this patch tested?
Added UT
Closes #35799 from AngersZhuuuu/SPARK-38498.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8100636ab8707d068b02b9e7cd3bfe1b850ff86e)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/streaming/StreamingConf.scala | 7 ++++++
.../apache/spark/streaming/StreamingContext.scala | 28 ++++++++++++++++++----
.../spark/streaming/StreamingListenerSuite.scala | 27 ++++++++++++++++++++-
3 files changed, 57 insertions(+), 5 deletions(-)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
index bb80bd7072e8..39f3d495bdbf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
@@ -185,4 +185,11 @@ object StreamingConf {
.longConf
.createWithDefault(0)
+ private[streaming] val STREAMING_EXTRA_LISTENERS =
+ ConfigBuilder("spark.streaming.extraListeners")
+ .doc("Class names of streaming listeners to add to StreamingContext
during initialization.")
+ .version("4.1.0")
+ .stringConf
+ .toSequence
+ .createOptional
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 79bc38318f91..139b83ba0d07 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -41,12 +41,11 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingConf.STOP_GRACEFULLY_ON_SHUTDOWN
+import org.apache.spark.streaming.StreamingConf.{STOP_GRACEFULLY_ON_SHUTDOWN,
STREAMING_EXTRA_LISTENERS}
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.
- {ExecutorAllocationManager, JobScheduler, StreamingListener,
StreamingListenerStreamingStarted}
+import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager,
JobScheduler, StreamingListener, StreamingListenerStreamingStarted}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener,
StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils,
Utils}
@@ -584,7 +583,7 @@ class StreamingContext private[streaming] (
validate()
registerProgressListener()
-
+ registerExtraStreamingListener()
// Start the streaming scheduler in a new thread, so that thread
local properties
// like call sites and job groups can be reset without affecting
those of the
// current thread.
@@ -622,6 +621,27 @@ class StreamingContext private[streaming] (
}
}
+ /**
+ * Registers streaming listeners specified in spark.streaming.extraListeners.
+ */
+ private def registerExtraStreamingListener(): Unit = {
+ try {
+ conf.get(STREAMING_EXTRA_LISTENERS).foreach { classNames =>
+ val listeners = Utils.loadExtensions(classOf[StreamingListener],
classNames, conf)
+ listeners.foreach { listener =>
+ addStreamingListener(listener)
+ logInfo(s"Registered streaming listener
${listener.getClass().getName()}")
+ }
+ }
+ } catch {
+ case e: Exception =>
+ try {
+ stop()
+ } finally {
+ throw new SparkException(s"Exception when registering
StreamingListener", e)
+ }
+ }
+ }
/**
* Wait for the execution to stop. Any exceptions that occurs during the
execution
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 63899f961e7b..fa560406ac3f 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -32,8 +32,9 @@ import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
@@ -235,6 +236,19 @@ class StreamingListenerSuite extends TestSuiteBase with
LocalStreamingContext wi
verifyNoMoreInteractions(streamingListener)
}
+ test("SPARK-38498: Support extra streaming listener") {
+ val conf = new SparkConf().setMaster("local").setAppName("customized
streaming listener")
+ .set(UI.UI_ENABLED, false)
+ .set(StreamingConf.STREAMING_EXTRA_LISTENERS.key,
+ classOf[ExtraStreamingListener].getName)
+ val sc = new SparkContext(conf)
+ ssc = new StreamingContext(sc, Milliseconds(1000))
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count())
+ startStreamingContextAndCallStop(ssc)
+ assert(ExtraStreamingListenerBatchCounter.COMPLETED_BATCH >= 1)
+ }
+
private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit =
{
val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc)
_ssc.addStreamingListener(contextStoppingCollector)
@@ -386,3 +400,14 @@ class StreamingContextStoppingCollector(val ssc:
StreamingContext) extends Strea
}
}
}
+
+class ExtraStreamingListener extends StreamingListener {
+ import ExtraStreamingListenerBatchCounter._
+ override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted): Unit = {
+ COMPLETED_BATCH += 1
+ }
+}
+
+object ExtraStreamingListenerBatchCounter {
+ var COMPLETED_BATCH = 0
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]