This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fa8c4809affb [SPARK-56543] Add RTM stateless benchmark
fa8c4809affb is described below
commit fa8c4809affbeda8409e9f0c1c6f043e2729d4e6
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri May 22 10:33:32 2026 -0700
[SPARK-56543] Add RTM stateless benchmark
### What changes were proposed in this pull request?
Adds RTMKafkaKafkaBenchmark, a standalone benchmark program for the
Real-Time Mode (RTM) trigger in Structured Streaming. It is a stateless
end-to-end Kafka-to-Kafka latency benchmark.
The benchmark is implemented as an object extending
org.apache.spark.benchmark.BenchmarkBase, following the same convention as
other Spark benchmarks (e.g.
StateStoreBasicOperationsBenchmark, MapStatusesSerDeserBenchmark). It is
not a ScalaTest suite, so it is not discovered or executed by SBT test or Maven
surefire — it only runs when
invoked explicitly via runMain or spark-submit.
The benchmark:
1. Spins up a local-cluster Spark context (3 workers × 5 cores × 1024 MB
heap, matching the 5-partition input topic) and a live embedded Kafka broker
via KafkaTestUtils.
2. Generates synthetic records at 1,000 records/sec (recordsPerSecond)
into an input Kafka topic (5 partitions, numPartitions) from a background
producer thread.
3. Runs a stateless pipeline with RealTimeTrigger and a 5-minute trigger
window (batchDuration): reads from Kafka → base64-encodes the value → stamps a
source-timestamp header → writes to
an output Kafka topic.
4. Captures per-batch processing latency via Spark's observe() API
(logged on StreamingQueryProgress, not written to the result file).
5. After 4 batches complete (numBatches), reads back the output topic and
reports e2e latency percentiles (p0, p50, p90, p95, p99, p100) over batches 2–4
— the first batch is dropped as
warmup via numBatchesToFilter = 1 — by comparing the source-timestamp
header to the Kafka sink timestamp.
6. Owns its own teardown via try { ... } finally { cleanup() } inside
runBenchmarkSuite, with an idempotent cleanup() that stops Spark and tears down
the embedded Kafka broker even if
setup partially fails, the streaming query times out, or post-run
analysis throws.
All knobs above (and the Kafka/streaming latency tuning options) are
declared as named private vals at the top of the object with a short rationale
per value.
Sample benchmark results
```
Kafka to kafka query e2e_latency in milliseconds is
p0: 45
p50: 70
p90: 78
p95: 81
p99: 85
p100: 331
```
### Why are the changes needed?
There is currently no benchmark to measure RTM stateless Kafka-to-Kafka
latency. This makes it hard to quantify regressions or improvements to the RTM
code path during local development
or before merging changes. This benchmark provides a repeatable,
self-contained way to measure that, and follows the existing Spark benchmark
framework so result files can be committed
and diffed across runs.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A. Only a benchmark was added.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6 (claude-sonnet-4-6)
Closes #55420 from jerrypeng/SPARK-56543.
Authored-by: Boyang Jerry Peng <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../benchmarks/RTMKafkaKafkaBenchmark-results.txt | 14 +
.../benchmark/RTMKafkaKafkaBenchmark.scala | 436 +++++++++++++++++++++
2 files changed, 450 insertions(+)
diff --git
a/connector/kafka-0-10-sql/benchmarks/RTMKafkaKafkaBenchmark-results.txt
b/connector/kafka-0-10-sql/benchmarks/RTMKafkaKafkaBenchmark-results.txt
new file mode 100644
index 000000000000..3849db97e030
--- /dev/null
+++ b/connector/kafka-0-10-sql/benchmarks/RTMKafkaKafkaBenchmark-results.txt
@@ -0,0 +1,14 @@
+================================================================================================
+RTM stateless kafka-to-kafka
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.15+6-Ubuntu-0ubuntu120.04 on Linux
5.4.0-1157-aws-fips
+Intel(R) Xeon(R) 6975P-C
+Kafka to kafka query e2e_latency in milliseconds is
+p0: 43
+p50: 69
+p90: 78
+p95: 80
+p99: 84
+p100: 553
+
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/benchmark/RTMKafkaKafkaBenchmark.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/benchmark/RTMKafkaKafkaBenchmark.scala
new file mode 100644
index 000000000000..36d63d9a530a
--- /dev/null
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/benchmark/RTMKafkaKafkaBenchmark.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010.benchmark
+
+import java.io.File
+import java.util.{Properties, Timer, TimerTask}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import scala.concurrent.duration._
+
+import org.apache.kafka.clients.producer.{Callback, KafkaProducer, Producer,
ProducerRecord, RecordMetadata}
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.kafka010.KafkaTestUtils
+import org.apache.spark.sql.streaming.StreamingQueryListener
+import org.apache.spark.util.Utils
+
+/**
+ * Stateless Kafka-to-Kafka RTM benchmark. Reads from an input Kafka topic,
applies a
+ * stateless transformation, and writes results to an output Kafka topic using
+ * [[RealTimeTrigger]]. After the run it reports e2e latency percentiles.
+ *
+ * The benchmark spins up a real local-cluster Spark context and a live
embedded Kafka
+ * broker, so a single run takes several minutes.
+ *
+ * Unlike most Spark benchmarks, this one does not use `Benchmark.run()` /
`addCase`: the
+ * metric of interest is end-to-end latency percentiles across a streaming
pipeline, which
+ * does not fit the Best/Avg/Stdev table format. The JVM/OS/processor header
that
+ * `Benchmark.run()` would normally emit is therefore written manually in
+ * `printLatenciesTable` for consistency with other benchmark result files.
+ *
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class>
+ * --jars <spark core test jar>,<spark sql test jar> <spark sql kafka
0-10 test jar>
+ * 2. build/sbt "sql-kafka-0-10/Test/runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt
"sql-kafka-0-10/Test/runMain <this class>"
+ * Results will be written to:
+ *
"connector/kafka-0-10-sql/benchmarks/RTMKafkaKafkaBenchmark-results.txt".
+ * }}}
+ *
+ * See `benchmarks/RTMKafkaKafkaBenchmark-results.txt` for a recorded run.
+ */
+object RTMKafkaKafkaBenchmark extends BenchmarkBase with Logging {
+
+ // ----- Benchmark dimensions -----
+
+ // Duration of each RTM batch (passed as RealTimeTrigger(batchDurationMs)).
Since RTM
+ // commits progress/checkpoints at batch boundaries, this also controls the
effective
+ // checkpoint/progress cadence. 5-minute is recommended.
+ // Lowering it may cause more frequent checkpointing but can increase
latency.
+ private val batchDuration = 5.minutes
+
+ // Total number of batches to run before stopping. With numBatchesToFilter
+ // warm-up batches filtered out, (numBatches - numBatchesToFilter) batches
+ // contribute to the reported percentiles.
+ private val numBatches = 4
+
+ // Warm-up batches dropped from the percentile calculation to discount
+ // cold-start effects (JIT, executor warm-up, Kafka producer buffering).
+ private val numBatchesToFilter = 1
+
+ // Synthetic input throughput in records/second produced by the data
generator
+ // thread into the input Kafka topic. Each record is a small string payload.
+ private val recordsPerSecond = 1000L
+
+ // ----- Spark topology -----
+
+ // local-cluster[N_WORKERS, CORES_PER_WORKER, HEAP_MB_PER_WORKER]. 3 workers
x 5
+ // cores matches the 5-partition input topic so each task gets its own core;
1 GB
+ // heap is enough for the stateless transform.
+ private val sparkMaster = "local-cluster[3, 5, 1024]"
+
+ // Partition count on both the input and output topics.
+ // By default, spark launches a task per partition.
+ // Make sure there is enough available slots in the cluster to schedule all
tasks.
+ private val numPartitions = 5
+
+ // ----- Streaming + Kafka tuning (chosen for low latency, not throughput)
-----
+
+ // How long the streaming engine waits between polling micro-batches. Set
low so
+ // RTM picks up new data with sub-50ms delay instead of the default 100ms.
+ private val streamingPollingDelayMs = 10
+
+ // Consumer-side: how long a fetch request blocks waiting for data on the
broker
+ // before returning empty. Set low so a partition that's briefly empty does
not
+ // delay the consumer for the default 500ms.
+ private val kafkaFetchMaxWaitMs = "10"
+
+ // Consumer-side: maximum bytes returned per partition per fetch. 10 MB lets
a
+ // single fetch drain the whole batch of records produced during one trigger.
+ private val kafkaMaxPartitionFetchBytes = "10485760"
+
+ // Producer-side (Spark Kafka sink): total memory the client uses for
batching
+ // unsent records. 64 MB keeps batching from blocking the writer under
bursty load.
+ private val kafkaBufferMemoryBytes = "67108864"
+
+ // ----- Mutable state -----
+
+ private val topicId = new AtomicInteger(0)
+ private var spark: SparkSession = _
+ private var testUtils: KafkaTestUtils = _
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ // BenchmarkBase.main does not wrap this call in try/finally, so we must
own
+ // teardown ourselves: partial setup, a timeout, or a getLatencies failure
+ // would otherwise leak the embedded Kafka broker and local-cluster
workers.
+ testUtils = new KafkaTestUtils(Map.empty)
+ try {
+ testUtils.setup()
+ spark = SparkSession.builder()
+ .master(sparkMaster)
+ .appName(this.getClass.getCanonicalName)
+ .getOrCreate()
+ runBenchmark("RTM stateless kafka-to-kafka") {
+ benchmark()
+ }
+ } finally {
+ cleanup()
+ }
+ }
+
+ /**
+ * Idempotent cleanup of the Spark session and embedded Kafka broker. Safe
to call
+ * after any combination of partial setup, normal completion, or exception.
+ */
+ private def cleanup(): Unit = {
+ if (spark != null) {
+ try {
+ spark.stop()
+ } catch {
+ case t: Throwable => logWarning("Failed to stop SparkSession during
cleanup", t)
+ }
+ spark = null
+ }
+ if (testUtils != null) {
+ try {
+ testUtils.teardown()
+ } catch {
+ case t: Throwable => logWarning("Failed to teardown KafkaTestUtils
during cleanup", t)
+ }
+ testUtils = null
+ }
+ }
+
+ private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+ /**
+ * Local equivalent of `SparkTestSuite.withTempDir`: creates a temp
directory, passes it
+ * to `f`, and recursively deletes it afterward. We define it here because
this benchmark
+ * extends `BenchmarkBase`, not a ScalaTest suite, so the standard helper is
unavailable.
+ */
+ private def withTempDir[T](f: File => T): T = {
+ val dir = Utils.createTempDir()
+ try f(dir) finally {
+ Utils.deleteRecursively(dir)
+ }
+ }
+
+ def benchmark(): Unit = withTempDir { checkpointDir =>
+ val inputTopic = newTopic()
+ testUtils.createTopic(inputTopic, partitions = numPartitions)
+
+ val outputTopic = newTopic()
+ testUtils.createTopic(outputTopic, partitions = numPartitions)
+
+ spark.conf.set(SQLConf.STREAMING_POLLING_DELAY.key,
streamingPollingDelayMs)
+
+ val kafkaStream = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", inputTopic)
+ .option("kafka.fetch.max.wait.ms", kafkaFetchMaxWaitMs)
+ .option("kafka.max.partition.fetch.bytes", kafkaMaxPartitionFetchBytes)
+ .load()
+
+ // UDF instead of current_timestamp(): the built-in is evaluated once per
batch
+ // for streaming determinism, but we want per-row wall-clock to measure
per-record
+ // latency.
+ val currentTimestampUDF = udf(() => System.currentTimeMillis())
+
+ val streamWithObserved = kafkaStream
+ .withColumn("value", base64(col("value")))
+ .withColumn(
+ "headers",
+ array(
+ struct(
+ lit("source-timestamp") as "key",
+ toUnixMillis(col("timestamp")).cast("STRING").cast("BINARY") as
"value")))
+ .withColumn("temp-timestamp", currentTimestampUDF())
+ .withColumn(
+ "latency",
+ col("temp-timestamp").cast("long") -
toUnixMillis(col("timestamp")).cast("long"))
+ // Kept deliberately even though the latency columns are dropped before
the sink:
+ // (1) exercises the observe() API in the hot path so any RTM regression
in observe
+ // overhead is visible in the e2e numbers, and
+ // (2) surfaces per-batch latency metrics on StreamingQueryProgress /
Spark UI for
+ // live monitoring during a run (not written to the result file).
+ .observe(
+ name = "observedLatency",
+ avg(col("latency")).as("avg"),
+ max(col("latency")).as("max"),
+ percentile_approx(col("latency"), lit(0.99), lit(10000)).as("p99"),
+ percentile_approx(col("latency"), lit(0.5), lit(10000)).as("p50"))
+ .drop(col("latency"))
+ .drop(col("temp-timestamp"))
+ .drop(col("timestamp"))
+
+ val query = streamWithObserved.writeStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("topic", outputTopic)
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .option("kafka.buffer.memory", kafkaBufferMemoryBytes)
+ .option("kafka.compression.type", "snappy")
+ .outputMode("update")
+ .queryName("rtm-kafka-kafka")
+ .trigger(RealTimeTrigger.apply(s"${batchDuration.toMillis}
milliseconds"))
+ .start()
+
+ val dataGenThread = new Thread(() => {
+ genData(testUtils.brokerAddress, inputTopic)
+ })
+ dataGenThread.setDaemon(true)
+ dataGenThread.start()
+
+ val latch = new CountDownLatch(1)
+ val batchesCompleted = new AtomicLong(0)
+ val listener = new StreamingQueryListener {
+ override def onQueryStarted(
+ event: StreamingQueryListener.QueryStartedEvent): Unit = {}
+
+ override def onQueryTerminated(
+ event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
+
+ override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
+ if (batchesCompleted.incrementAndGet() >= numBatches) {
+ latch.countDown()
+ }
+ }
+ }
+ spark.streams.addListener(listener)
+
+ val timeoutMs = numBatches * batchDuration.toMillis * 2 + 60 * 1000
+ val completed = try {
+ latch.await(timeoutMs, TimeUnit.MILLISECONDS)
+ } finally {
+ spark.streams.removeListener(listener)
+ query.stop()
+ dataGenThread.interrupt()
+ dataGenThread.join(30 * 1000)
+ }
+ if (!completed) {
+ throw new RuntimeException(
+ s"Benchmark timed out waiting for $numBatches batches to complete
after ${timeoutMs}ms.")
+ }
+
+ getLatencies(outputTopic)
+ }
+
+ private def genData(url: String, topicName: String): Unit = {
+ logInfo(s"Producing to $url topic $topicName at $recordsPerSecond records
/ sec")
+
+ val props: Properties = new Properties()
+ props.put("bootstrap.servers", url)
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+
+ val producer: Producer[String, String] = new KafkaProducer[String,
String](props)
+ val success = new AtomicLong(0)
+ val timer = new Timer()
+
+ try {
+ timer.scheduleAtFixedRate(
+ new TimerTask() {
+ override def run(): Unit = {
+ logInfo("Throughput: " + success.getAndSet(0) + " requests/sec")
+ }
+ },
+ 1000,
+ 1000
+ )
+
+ var i = 0L
+ val startTime = System.nanoTime
+ val delay = (Math.pow(10, 9) / recordsPerSecond).asInstanceOf[Long]
+ var nextDeadline = startTime + delay
+ while (true) {
+ var currentTime = System.nanoTime
+ if (currentTime >= nextDeadline) {
+ i += 1
+ nextDeadline = startTime + (i * delay)
+ producer.send(
+ new ProducerRecord[String, String](
+ topicName,
+ java.lang.Long.toString(i),
+ java.lang.Long.toString(System.currentTimeMillis())
+ ),
+ new Callback {
+ override def onCompletion(recordMetadata: RecordMetadata, e:
Exception): Unit = {
+ if (e != null) {
+ logError("Got exception producing to kafka", e)
+ } else {
+ success.incrementAndGet()
+ }
+ }
+ }
+ )
+ currentTime = System.nanoTime
+
+ val sleepTimeNs =
+ if ((nextDeadline - currentTime) > 0) nextDeadline - currentTime
+ else 0
+ if (sleepTimeNs > 0) {
+ val sleepTimeMs = sleepTimeNs.nanoseconds.toMillis
+ val sleepTimeNano = (sleepTimeNs -
sleepTimeMs.milliseconds.toNanos).toInt
+ Thread.sleep(sleepTimeMs, sleepTimeNano)
+ }
+ }
+ }
+ } catch {
+ case _: InterruptedException => // expected on shutdown
+ } finally {
+ timer.cancel()
+ producer.close()
+ }
+ }
+
+ private def printLatenciesTable(viewName: String, colName: String): Unit = {
+ val results = spark.sqlContext
+ .sql(s"""SELECT percentile_approx($colName, Array(0.0, 0.5, 0.9, 0.95,
0.99, 1.0), 10000)
+ | FROM $viewName""".stripMargin)
+ .collect()(0)(0)
+
+ if (results == null) {
+ throw new RuntimeException(
+ s"No results found in table $viewName when trying to print latency for
$colName. " +
+ s"The benchmark may need more batches or a longer duration to
produce enough data."
+ )
+ }
+
+ val latencies = results.asInstanceOf[scala.collection.Seq[_]]
+
+ val percentiles = Array("p0", "p50", "p90", "p95", "p99", "p100")
+ val latenciesTable = percentiles
+ .zip(latencies)
+ .map(pair => pair._1 + ": " + pair._2)
+ .mkString("\n")
+
+ // Include JVM/OS/processor info so result files are comparable across
runs, matching
+ // the header that org.apache.spark.benchmark.Benchmark.run() emits.
+ val envHeader =
+ s"${Benchmark.getJVMOSInfo()}\n${Benchmark.getProcessorName()}\n"
+ val message =
+ envHeader + s"Kafka to kafka query ${colName} in milliseconds is\n" +
latenciesTable + "\n"
+
+ output match {
+ case Some(out) => out.write(message.getBytes)
+ case None => logInfo("\n" + message)
+ }
+ }
+
+ private def getLatencies(outputTopic: String): Unit = {
+ val kafkaSinkData = spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", outputTopic)
+ .option("includeHeaders", "true")
+ .load()
+ .withColumn("headers-map", map_from_entries(col("headers")))
+ .withColumn("source-timestamp",
+ col("headers-map.source-timestamp").cast("STRING").cast("BIGINT"))
+ .withColumn("sink-timestamp", toUnixMillis(col("timestamp")))
+
+ val numRecordsInSink = kafkaSinkData.count()
+ val minimumSourceTimestamp =
+
kafkaSinkData.agg(min("source-timestamp")).collect()(0)(0).asInstanceOf[Long]
+
+ val timeFilterThresholdMs = batchDuration.toMillis * numBatchesToFilter
+ val filteredSink = kafkaSinkData
+ .withColumn("time", col("source-timestamp") - minimumSourceTimestamp)
+ .filter(col("time") > timeFilterThresholdMs)
+
+ if (filteredSink.count() == 0) {
+ if (numRecordsInSink > 0) {
+ throw new RuntimeException(
+ s"There were ${numRecordsInSink} records in the Kafka sink topic
$outputTopic, " +
+ s"but none remained after filtering the first
${numBatchesToFilter} batch(es) " +
+ s"(${timeFilterThresholdMs} ms). Run more batches (current:
${numBatches})."
+ )
+ } else {
+ throw new RuntimeException(
+ s"No results were found in the Kafka sink topic $outputTopic. " +
+ s"The query may not have produced results or the sink topic was
incorrect."
+ )
+ }
+ }
+
+ val sinkWithLatencies = filteredSink
+ .withColumn("e2e_latency", col("sink-timestamp") -
col("source-timestamp"))
+ sinkWithLatencies.createOrReplaceTempView("sink_with_latencies")
+
+ printLatenciesTable("sink_with_latencies", "e2e_latency")
+ }
+
+ // Named to avoid shadowing org.apache.spark.sql.functions.unix_millis
(imported above).
+ // Goes through DOUBLE seconds * 1000 cast to LONG, which truncates
sub-millisecond precision;
+ // safe here because every call site passes a Kafka record timestamp,
already at ms
+ // resolution.
+ private def toUnixMillis(column: Column): Column = {
+ (column.cast("timestamp").cast("double") * 1000).cast("long")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]