This is an automated email from the ASF dual-hosted git repository.
HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new ed940dd418b5 [SPARK-56537][SS] Reset per-batch time fields and
customMetrics on no batch trigger progress event
ed940dd418b5 is described below
commit ed940dd418b51bf06d30ce313d2e1463f90ad5db
Author: Dhruv Patel <[email protected]>
AuthorDate: Thu May 14 08:07:18 2026 -0700
[SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch
trigger progress event
### What changes were proposed in this pull request?
This PR is a follow-up to
[SPARK-56464](https://issues.apache.org/jira/browse/SPARK-56464) (commit
`930c3039871`), which left a `TODO(SPARK-56537)` in
`ProgressReporter#resetExecStatsForNoExecution` to track the remaining
per-batch fields on `StateOperatorProgress` that were not being reset on
no-data trigger progress events.
Three changes:
1. **Reset the per-batch time fields on no-data trigger progress events.**
`allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset to 0
alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`,
`numRowsDroppedByWatermark`) that were already handled by SPARK-56464.
2. **Reset per-batch entries of `customMetrics` while preserving snapshot
entries.** `StateOperatorProgress.customMetrics` carries values from two metric
registries (`StateStoreCustomMetric` for provider-level,
`StatefulOperatorCustomMetric` for operator-level) and conflates per-batch
counters/timings with snapshot reads of state-store status (current memory
usage, key counts, file size). On a no-data trigger we now zero per-batch
entries and preserve snapshot entries.
The snapshot/per-batch distinction is encoded at the metric definition
via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default
`false`). The six snapshot Size metrics are marked at their definitions:
- RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`,
`rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`,
`rocksdbNumInternalColumnFamilies`.
- HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`.
`StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep
using the trait default (always per-batch). Operator-level
`StatefulOperatorCustomSumMetric` instances (declared by
`BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and
`TransformWithStateExecBase`) are also always per-batch.
3. **Centralize the reset semantics in a new `copyForNoExecution()` method
on `StateOperatorProgress`** instead of growing `copy(...)`'s parameter list
further. The method takes no parameters; it inspects the operator instance's
`snapshotCustomMetricNames` (a new `private[spark]` constructor field,
defaulted to `Set.empty`, populated at progress build time by
`StateStoreWriter.getProgress`) to decide which `customMetrics` keys to
preserve. The existing 3-arg `copy(newNumRowsUpdated, n [...]
The `TODO(SPARK-56537)` comment is removed from
`ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a
single delegating map:
`originExecStats.stateOperators.map(_.copyForNoExecution())`.
### Why are the changes needed?
Today, on a no-data ("idle") trigger progress event,
`StateOperatorProgress` carries the previous batch's values for
`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`, and most of
`customMetrics`. To a user reading `query.lastProgress` /
`query.recentProgress` during an idle period this looks like work was performed
when none was. It is also a known source of test flakiness.
The `TODO(SPARK-56537)` left by SPARK-56464 in
`ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this
follow-up.
The design was discussed on the JIRA ticket and confirmed before
implementation:
- Encode snapshot semantics at the metric definition (option (2b) in the
audit comment), not via a hardcoded whitelist in the reset routine.
- Add a new `copyForNoExecution()` method on `StateOperatorProgress` rather
than growing the existing `copy(...)` argument list further (3 args after
SPARK-56464 would have become 6+).
### Does this PR introduce _any_ user-facing change?
No public API change.
User-visible behavior change: idle-trigger progress events emitted via
`StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and
`query.recentProgress` will now report `0` for all per-batch fields and
per-batch `customMetrics` entries instead of carrying stale values from the
previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`,
`numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics) are
unchanged. Same direction as the SPARK-56464 fi [...]
### How was this patch tested?
New and updated tests in `ProgressReporterSuite.scala`:
1. Extended the SPARK-56464 test with assertions that the three time fields
(`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0 on the
idle trigger, alongside the existing row-count assertions. Test description
updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch
resets all per-batch StateOperatorProgress fields to zero" to reflect the
broader scope.
2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but
preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot
split end-to-end against a real `RocksDBStateStoreProvider`. The test runs one
data batch, advances the manual clock to trigger an idle progress event, then
asserts that 3 per-batch RocksDB metrics (`rocksdbCommitFlushLatency`,
`rocksdbPutCount`, `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5
snapshot RocksDB metrics (` [...]
Local verification:
- `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass.
- `build/sbt 'sql/testOnly *ProgressReporterSuite
*StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite
*StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass
in 7m 16s.
- `dev/mima` -> no exclusions required.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes #55699 from DHRUV6029/SPARK-56537.
Authored-by: Dhruv Patel <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 83dc6efb16db78cd1fed40ca9a942f8632b92e8f)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/tests/streaming/test_streaming_listener.py | 7 +-
.../org/apache/spark/sql/streaming/progress.scala | 34 +++++-
.../operators/stateful/statefulOperators.scala | 46 +++++--
.../streaming/runtime/ProgressReporter.scala | 6 +-
.../state/HDFSBackedStateStoreProvider.scala | 3 +-
.../state/RocksDBStateStoreProvider.scala | 17 ++-
.../sql/execution/streaming/state/StateStore.scala | 9 +-
.../streaming/ProgressReporterSuite.scala | 132 ++++++++++++++++++++-
8 files changed, 227 insertions(+), 27 deletions(-)
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
index 620186f70b66..b4922f54b217 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
@@ -312,7 +312,12 @@ class StreamingListenerTests(StreamingListenerTestsMixin,
ReusedSQLTestCase):
)
self.assertEqual(
get_number_of_public_methods("org.apache.spark.sql.streaming.StateOperatorProgress"),
- 27,
+ # SPARK-56537: bumped from 27 to 30 due to the new
snapshotCustomMetricNames
+ # constructor parameter (getter + synthetic default) and the new
internal
+ # copyForNoExecution() method on StateOperatorProgress. Both are
non-public
+ # API (private[spark] / private[sql]) so they are not mirrored on
the
+ # Python side; only the count needs updating.
+ 30,
msg,
)
self.assertEqual(
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 0502936e3cc4..619d8fb53311 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -56,7 +56,10 @@ class StateOperatorProgress private[spark] (
val numRowsDroppedByWatermark: Long,
val numShufflePartitions: Long,
val numStateStoreInstances: Long,
- val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+ val customMetrics: ju.Map[String, JLong] = new ju.HashMap(),
+ // Names of customMetrics entries treated as snapshots of state-store
status;
+ // preserved by copyForNoExecution() and not surfaced in JSON output.
+ private[spark] val snapshotCustomMetricNames: Set[String] = Set.empty)
extends Serializable {
/** The compact JSON representation of this progress. */
@@ -81,7 +84,34 @@ class StateOperatorProgress private[spark] (
numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
numShufflePartitions = numShufflePartitions,
numStateStoreInstances = numStateStoreInstances,
- customMetrics = customMetrics)
+ customMetrics = customMetrics,
+ snapshotCustomMetricNames = snapshotCustomMetricNames)
+
+ /**
+ * Returns a copy of this progress suitable for a no-data trigger event.
Per-batch fields (row
+ * counts, time-Ms fields, and customMetrics entries not in
`snapshotCustomMetricNames`) are
+ * zeroed; snapshot fields and snapshot customMetrics entries are preserved.
+ */
+ private[sql] def copyForNoExecution(): StateOperatorProgress = {
+ val newCustomMetrics = new ju.HashMap[String, JLong](customMetrics.size())
+ customMetrics.forEach { (k, v) =>
+ newCustomMetrics.put(k, if (snapshotCustomMetricNames.contains(k)) v
else 0L)
+ }
+ new StateOperatorProgress(
+ operatorName = operatorName,
+ numRowsTotal = numRowsTotal,
+ numRowsUpdated = 0L,
+ allUpdatesTimeMs = 0L,
+ numRowsRemoved = 0L,
+ allRemovalsTimeMs = 0L,
+ commitTimeMs = 0L,
+ memoryUsedBytes = memoryUsedBytes,
+ numRowsDroppedByWatermark = 0L,
+ numShufflePartitions = numShufflePartitions,
+ numStateStoreInstances = numStateStoreInstances,
+ customMetrics = newCustomMetrics,
+ snapshotCustomMetricNames = snapshotCustomMetricNames)
+ }
private[sql] def jsonValue: JValue = {
("operatorName" -> JString(operatorName)) ~
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
index 9fcc0a506570..59a2b9ee74f8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
@@ -137,6 +137,9 @@ trait StatefulOperatorCustomMetric {
def name: String
def desc: String
def createSQLMetric(sparkContext: SparkContext): SQLMetric
+ // True if the metric reflects current state rather than per-batch work;
snapshot
+ // metrics are preserved on no-data trigger events. Mirrors
StateStoreCustomMetric.
+ def isSnapshot: Boolean = false
}
/** Custom stateful operator metric for simple "count" gauge */
@@ -402,7 +405,8 @@ trait StateStoreWriter
numRowsDroppedByWatermark =
longMetric("numRowsDroppedByWatermark").value,
numShufflePartitions =
stateInfo.map(_.numPartitions.toLong).getOrElse(-1L),
numStateStoreInstances = longMetric("numStateStoreInstances").value,
- javaConvertedCustomMetrics
+ javaConvertedCustomMetrics,
+ snapshotCustomMetricNames
)
}
@@ -475,17 +479,43 @@ trait StateStoreWriter
}.toMap
}
- private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric,
SQLMetric] = {
+ // All instance metrics with their (partitionId, storeName) bindings;
consumed by
+ // both `stateStoreInstanceMetrics` (for SQLMetric registration) and
+ // `snapshotCustomMetricNames` (for the snapshot-name set). The result is a
+ // serializable Seq so storing it as a lazy val on this trait is safe even
when
+ // the enclosing SparkPlan is shipped to executors. The provider itself is
NOT
+ // stored as a field (it is non-serializable), so each consumer below
recreates
+ // it locally.
+ private lazy val stateStoreInstanceMetricsWithIds:
Seq[StateStoreInstanceMetric] = {
val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
- val maxPartitions =
stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions)
-
+ val maxPartitions =
+
stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions)
(0 until maxPartitions).flatMap { partitionId =>
provider.supportedInstanceMetrics.flatMap { metric =>
- stateStoreNames.map { storeName =>
- val metricWithPartition = metric.withNewId(partitionId, storeName)
- (metricWithPartition,
metricWithPartition.createSQLMetric(sparkContext))
- }
+ stateStoreNames.map(metric.withNewId(partitionId, _))
}
+ }
+ }
+
+ // Names of customMetrics entries treated as snapshots; preserved by
+ // StateOperatorProgress.copyForNoExecution() on no-data trigger events.
Includes
+ // provider- and operator-level metrics with isSnapshot = true, and all
instance
+ // metric names (instance metrics use sentinel inits like -1 with monotonic
+ // combine, so they are always snapshot-style).
+ private lazy val snapshotCustomMetricNames: Set[String] = {
+ val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+ val customSnapshots = provider.supportedCustomMetrics.collect {
+ case m if m.isSnapshot => m.name
+ }.toSet
+ val operatorSnapshots = customStatefulOperatorMetrics.collect {
+ case m if m.isSnapshot => m.name
+ }.toSet
+ customSnapshots ++ operatorSnapshots ++
stateStoreInstanceMetricsWithIds.map(_.name).toSet
+ }
+
+ private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric,
SQLMetric] = {
+ stateStoreInstanceMetricsWithIds.map { metric =>
+ (metric, metric.createSQLMetric(sparkContext))
}.toMap
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
index 73b75df1a599..161696fb9260 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
@@ -648,13 +648,9 @@ abstract class ProgressContext(
* New execution stats will only retain the values as a snapshot of the
query status.
* (E.g. for stateful operators, numRowsTotal is a snapshot of the status,
whereas
* numRowsUpdated is bound to the batch.)
- * TODO(SPARK-56537): We do not seem to clear up all values in
StateOperatorProgress which are
- * bound to the batch. Fix this.
*/
private def resetExecStatsForNoExecution(originExecStats: ExecutionStats):
ExecutionStats = {
- val newStatefulOperators = originExecStats.stateOperators.map { so =>
- so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0,
newNumRowsRemoved = 0)
- }
+ val newStatefulOperators =
originExecStats.stateOperators.map(_.copyForNoExecution())
val newEventTimeStats = if
(originExecStats.eventTimeStats.contains("watermark")) {
Map("watermark" ->
progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 29a17b4eb7ec..2562f1ff3304 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -566,7 +566,8 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
private lazy val metricStateOnCurrentVersionSizeBytes:
StateStoreCustomSizeMetric =
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
- "estimated size of state only on current version")
+ "estimated size of state only on current version",
+ isSnapshot = true)
private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheHitCount",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 45168f407132..b3d734c71f91 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -1488,22 +1488,27 @@ object RocksDBStateStoreProvider {
val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric(
"rocksdbTotalBytesWrittenByFlush",
"RocksDB: flush - total bytes written by flush")
+ // Snapshot metrics: read current RocksDB state, preserved on no-data
trigger events.
val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric(
"rocksdbPinnedBlocksMemoryUsage",
- "RocksDB: memory usage for pinned blocks")
+ "RocksDB: memory usage for pinned blocks",
+ isSnapshot = true)
val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS =
StateStoreCustomSizeMetric(
"rocksdbNumInternalColFamiliesKeys",
- "RocksDB: number of internal keys for internal column families")
+ "RocksDB: number of internal keys for internal column families",
+ isSnapshot = true)
val CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric(
"rocksdbNumExternalColumnFamilies",
- "RocksDB: number of external column families")
+ "RocksDB: number of external column families",
+ isSnapshot = true)
val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric(
"rocksdbNumInternalColumnFamilies",
- "RocksDB: number of internal column families")
+ "RocksDB: number of internal column families",
+ isSnapshot = true)
- // Total SST file size
+ // Total SST file size (snapshot).
val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric(
- "rocksdbSstFileSize", "RocksDB: size of all SST files")
+ "rocksdbSstFileSize", "RocksDB: size of all SST files", isSnapshot = true)
val CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED = StateStoreCustomSumMetric(
"rocksdbNumSnapshotsAutoRepaired",
"RocksDB: number of snapshots that were automatically repaired during
store load")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index e3601f1ef224..ad067b8edcc3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -537,6 +537,10 @@ trait StateStoreCustomMetric {
def desc: String
def withNewDesc(desc: String): StateStoreCustomMetric
def createSQLMetric(sparkContext: SparkContext): SQLMetric
+
+ // True if the metric reflects current store state (e.g. file size, memory)
rather
+ // than per-batch work; snapshot metrics are preserved on no-data trigger
events.
+ def isSnapshot: Boolean = false
}
case class StateStoreCustomSumMetric(name: String, desc: String) extends
StateStoreCustomMetric {
@@ -546,7 +550,10 @@ case class StateStoreCustomSumMetric(name: String, desc:
String) extends StateSt
SQLMetrics.createMetric(sparkContext, desc)
}
-case class StateStoreCustomSizeMetric(name: String, desc: String) extends
StateStoreCustomMetric {
+case class StateStoreCustomSizeMetric(
+ name: String,
+ desc: String,
+ override val isSnapshot: Boolean = false) extends StateStoreCustomMetric {
override def withNewDesc(desc: String): StateStoreCustomSizeMetric =
copy(desc = desc)
override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
index da037936849e..134bfc6b914b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
@@ -20,15 +20,16 @@ package org.apache.spark.sql.execution.streaming
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress,
StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
class ProgressReporterSuite extends StreamTest {
import testImplicits._
- test("no-data batch resets numRowsRemoved to zero" +
+ test("no-data batch resets all per-batch StateOperatorProgress fields to
zero" +
" via resetExecStatsForNoExecution") {
val clock = new StreamManualClock
val input = MemoryStream[Int]
@@ -73,7 +74,7 @@ class ProgressReporterSuite extends StreamTest {
.exists(_.stateOperators.head.numRowsRemoved > 0)
assert(removed, "Expected numRowsRemoved > 0")
},
- // Idle trigger — finishNoExecutionTrigger calls
+ // Idle trigger: finishNoExecutionTrigger calls
// resetExecStatsForNoExecution which must zero out
// per-batch metrics.
AdvanceManualClock(1 * 1000),
@@ -94,10 +95,135 @@ class ProgressReporterSuite extends StreamTest {
assert(so.numRowsUpdated === 0,
s"numRowsUpdated=${so.numRowsUpdated}")
assert(so.numRowsDroppedByWatermark === 0,
s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}")
+ assert(so.allUpdatesTimeMs === 0,
+ s"allUpdatesTimeMs=${so.allUpdatesTimeMs}")
+ assert(so.allRemovalsTimeMs === 0,
+ s"allRemovalsTimeMs=${so.allRemovalsTimeMs}")
+ assert(so.commitTimeMs === 0,
+ s"commitTimeMs=${so.commitTimeMs}")
}
},
StopStream
)
}
}
+
+ test("SPARK-56537: no-data batch resets per-batch customMetrics but" +
+ " preserves snapshot customMetrics (RocksDB)") {
+ val clock = new StreamManualClock
+ val input = MemoryStream[Int]
+ val agg = input.toDF()
+ .select(timestamp_seconds($"value") as "ts", $"value")
+ .withWatermark("ts", "10 seconds")
+ .groupBy(window($"ts", "10 seconds"))
+ .agg(count("*") as "cnt")
+ .select($"window".getField("start").cast("long"), $"cnt")
+
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.STREAMING_POLLING_DELAY.key -> "0",
+ SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "0") {
+ testStream(agg, outputMode = OutputMode.Update)(
+ StartStream(
+ Trigger.ProcessingTime("1 second"),
+ triggerClock = clock),
+ // Batch 0: real data, populates customMetrics with non-zero per-batch
values.
+ AddData(input, 1, 2, 3),
+ AdvanceManualClock(1 * 1000),
+ CheckNewAnswer((0L, 3L)),
+ // Idle trigger.
+ AdvanceManualClock(1 * 1000),
+ Execute("verify customMetrics behavior on idle trigger") { q =>
+ eventually(Timeout(streamingTimeout)) {
+ val progress = q.recentProgress.filter(_.stateOperators.nonEmpty)
+ val lastDataIdx = progress.lastIndexWhere { p =>
+ p.durationMs.containsKey("addBatch")
+ }
+ assert(lastDataIdx >= 0, "no data batch found")
+ val idleIdx = progress.indexWhere(
+ !_.durationMs.containsKey("addBatch"), lastDataIdx + 1)
+ assert(idleIdx > lastDataIdx,
+ "no idle trigger found after data batch")
+
+ val dataCm =
progress(lastDataIdx).stateOperators.head.customMetrics
+ val idleCm = progress(idleIdx).stateOperators.head.customMetrics
+
+ // Per-batch RocksDB metrics: zeroed on idle. The metric must be
present
+ // in the map (we keep keys consistent across data and idle
progress).
+ Seq("rocksdbCommitFlushLatency",
+ "rocksdbPutCount",
+ "rocksdbTotalBytesWritten").foreach { k =>
+ assert(idleCm.containsKey(k), s"$k missing on idle")
+ assert(idleCm.get(k) === 0L,
+ s"per-batch metric $k expected 0 on idle, got
${idleCm.get(k)}")
+ }
+
+ // Snapshot RocksDB metrics: value unchanged across idle trigger.
+ Seq("rocksdbPinnedBlocksMemoryUsage",
+ "rocksdbNumInternalColFamiliesKeys",
+ "rocksdbNumExternalColumnFamilies",
+ "rocksdbNumInternalColumnFamilies",
+ "rocksdbSstFileSize").foreach { k =>
+ assert(idleCm.containsKey(k), s"$k missing on idle")
+ assert(idleCm.get(k) === dataCm.get(k),
+ s"snapshot metric $k changed across idle trigger: " +
+ s"data=${dataCm.get(k)} idle=${idleCm.get(k)}")
+ }
+ }
+ },
+ StopStream
+ )
+ }
+ }
+
+ test("SPARK-56537: copyForNoExecution zeroes per-batch fields and preserves
snapshot fields") {
+ val customMetrics = new java.util.HashMap[String, java.lang.Long]()
+ customMetrics.put("perBatchTimer", 100L)
+ customMetrics.put("perBatchCounter", 50L)
+ customMetrics.put("snapshotSize", 999L)
+ val orig = new StateOperatorProgress(
+ operatorName = "op",
+ numRowsTotal = 50L,
+ numRowsUpdated = 10L,
+ allUpdatesTimeMs = 7L,
+ numRowsRemoved = 3L,
+ allRemovalsTimeMs = 5L,
+ commitTimeMs = 11L,
+ memoryUsedBytes = 2048L,
+ numRowsDroppedByWatermark = 2L,
+ numShufflePartitions = 4L,
+ numStateStoreInstances = 4L,
+ customMetrics = customMetrics,
+ snapshotCustomMetricNames = Set("snapshotSize"))
+
+ val out = orig.copyForNoExecution()
+
+ // Per-batch fields are zeroed.
+ assert(out.numRowsUpdated === 0L)
+ assert(out.allUpdatesTimeMs === 0L)
+ assert(out.numRowsRemoved === 0L)
+ assert(out.allRemovalsTimeMs === 0L)
+ assert(out.commitTimeMs === 0L)
+ assert(out.numRowsDroppedByWatermark === 0L)
+
+ // Snapshot fields are preserved.
+ assert(out.operatorName === "op")
+ assert(out.numRowsTotal === 50L)
+ assert(out.memoryUsedBytes === 2048L)
+ assert(out.numShufflePartitions === 4L)
+ assert(out.numStateStoreInstances === 4L)
+
+ // customMetrics: per-batch zeroed, snapshot preserved.
+ assert(out.customMetrics.get("perBatchTimer") === 0L)
+ assert(out.customMetrics.get("perBatchCounter") === 0L)
+ assert(out.customMetrics.get("snapshotSize") === 999L)
+
+ // Original is not mutated.
+ assert(orig.numRowsUpdated === 10L)
+ assert(orig.customMetrics.get("perBatchTimer") === 100L)
+
+ // snapshotCustomMetricNames is preserved so subsequent copy() round-trips
work.
+ assert(out.snapshotCustomMetricNames === Set("snapshotSize"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]