This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9e1ee445762 [SPARK-55494][SS] Introduce iterator/prefixScan with
multi-values in StateStore API
d9e1ee445762 is described below
commit d9e1ee44576285f07c4d1803eec411a764a48d8b
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Feb 19 06:53:15 2026 +0900
[SPARK-55494][SS] Introduce iterator/prefixScan with multi-values in
StateStore API
### What changes were proposed in this pull request?
This PR proposes to introduce iterator/prefixScan with multi-values in
StateStore API.
### Why are the changes needed?
The functionality is missing on StateStore API so when the caller sets
multi-values for specific CF, that CF doesn't support scanning through the
data. The new functionality will be used in new state format version in
stream-stream join, specifically SPARK-55144 (#53930).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UTs.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4.5-sonnet
The above is used for creating a new test suite. All other parts aren't
generated by LLM.
Closes #54278 from HeartSaVioR/SPARK-55494.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../state/HDFSBackedStateStoreProvider.scala | 24 +++
.../state/RocksDBStateStoreProvider.scala | 68 ++++++++
.../sql/execution/streaming/state/StateStore.scala | 32 ++++
.../streaming/state/MemoryStateStore.scala | 10 ++
.../RocksDBStateStoreCheckpointFormatV2Suite.scala | 10 ++
.../RocksDBTimestampEncoderOperationsSuite.scala | 178 +++++++++++++++++++--
6 files changed, 313 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3a5d14f5581a..3303b414ccd3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -111,6 +111,17 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
override def allColumnFamilyNames: Set[String] =
Set[String](StateStore.DEFAULT_COL_FAMILY_NAME)
+
+ override def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow,
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
"HDFSStateStore")
+ }
+
+ override def iteratorWithMultiValues(
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
"HDFSStateStore")
+ }
}
/** Implementation of [[StateStore]] API which is backed by an
HDFS-compatible file system */
@@ -323,6 +334,19 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
key: UnsafeRow, values: Array[UnsafeRow], colFamilyName: String): Unit
= {
throw StateStoreErrors.unsupportedOperationException("mergeList",
providerName)
}
+
+ override def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow,
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ throw StateStoreErrors.unsupportedOperationException(
+ "prefixScanWithMultiValues", providerName)
+ }
+
+ override def iteratorWithMultiValues(
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ throw StateStoreErrors.unsupportedOperationException(
+ "iteratorWithMultiValues", providerName)
+ }
}
def getMetricsForProvider(): Map[String, Long] = synchronized {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 6dde67dd5b66..d8e0440e210d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -458,6 +458,40 @@ private[sql] class RocksDBStateStoreProvider
}
}
+ override def iteratorWithMultiValues(
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ validateAndTransitionState(UPDATE)
+ // Note this verify function only verify on the colFamilyName being
valid,
+ // we are actually doing prefix when useColumnFamilies,
+ // but pass "iteratorWithMultiValues" to throw correct error message
+ verifyColFamilyOperations("iteratorWithMultiValues", colFamilyName)
+
+ val kvEncoder = keyValueEncoderMap.get(colFamilyName)
+ verify(
+ kvEncoder._2.supportsMultipleValuesPerKey,
+ "Multi-value iterator operation requires an encoder" +
+ " which supports multiple values for a single key")
+
+ val rocksDbIter = rocksDB.iterator(colFamilyName)
+
+ val rowPair = new UnsafeRowPair()
+ val iter = rocksDbIter.flatMap { kv =>
+ val keyRow = kvEncoder._1.decodeKey(kv.key)
+ val valueRows = kvEncoder._2.decodeValues(kv.value)
+ valueRows.iterator.map { valueRow =>
+ rowPair.withRows(keyRow, valueRow)
+ if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+ StateStoreProvider.validateStateRowFormat(
+ rowPair.key, keySchema, rowPair.value, valueSchema,
stateStoreId, storeConf)
+ isValidated = true
+ }
+ rowPair
+ }
+ }
+
+ new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
+ }
+
override def prefixScan(
prefixKey: UnsafeRow,
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
@@ -481,6 +515,40 @@ private[sql] class RocksDBStateStoreProvider
new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
}
+ override def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow, colFamilyName: String):
StateStoreIterator[UnsafeRowPair] = {
+ validateAndTransitionState(UPDATE)
+ verifyColFamilyOperations("prefixScanWithMultiValues", colFamilyName)
+
+ val kvEncoder = keyValueEncoderMap.get(colFamilyName)
+ verify(kvEncoder._1.supportPrefixKeyScan,
+ "prefixScanWithMultiValues requires encoder supporting prefix scan!")
+ verify(
+ kvEncoder._2.supportsMultipleValuesPerKey,
+ "Multi-value iterator operation requires an encoder" +
+ " which supports multiple values for a single key")
+
+ val prefix = kvEncoder._1.encodePrefixKey(prefixKey)
+ val rocksDbIter = rocksDB.prefixScan(prefix, colFamilyName)
+
+ val rowPair = new UnsafeRowPair()
+ val iter = rocksDbIter.flatMap { kv =>
+ val keyRow = kvEncoder._1.decodeKey(kv.key)
+ val valueRows = kvEncoder._2.decodeValues(kv.value)
+ valueRows.iterator.map { valueRow =>
+ rowPair.withRows(keyRow, valueRow)
+ if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+ StateStoreProvider.validateStateRowFormat(
+ rowPair.key, keySchema, rowPair.value, valueSchema,
stateStoreId, storeConf)
+ isValidated = true
+ }
+ rowPair
+ }
+ }
+
+ new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
+ }
+
var checkpointInfo: Option[StateStoreCheckpointInfo] = None
private var storedMetrics: Option[RocksDBMetrics] = None
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index acb82680d279..e2d7c166a675 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -171,10 +171,31 @@ trait ReadStateStore {
prefixKey: UnsafeRow,
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
StateStoreIterator[UnsafeRowPair]
+ /**
+ * Return an iterator containing all the (key, value) pairs which are
matched with
+ * the given prefix key.
+ *
+ * It is expected to throw exception if Spark calls this method without
proper key encoding spec.
+ * It is also expected to throw exception if Spark calls this method without
setting
+ * multipleValuesPerKey as true for the column family.
+ */
+ def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow,
+ colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
StateStoreIterator[UnsafeRowPair]
+
/** Return an iterator containing all the key-value pairs in the StateStore.
*/
def iterator(
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
StateStoreIterator[UnsafeRowPair]
+ /**
+ * Return an iterator containing all the key-value pairs in the StateStore.
+ *
+ * It is expected to throw exception if Spark calls this method without
setting
+ * multipleValuesPerKey as true for the column family.
+ */
+ def iteratorWithMultiValues(
+ colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
StateStoreIterator[UnsafeRowPair]
+
/**
* Clean up the resource.
*
@@ -384,6 +405,17 @@ class WrappedReadStateStore(store: StateStore) extends
ReadStateStore {
}
override def allColumnFamilyNames: Set[String] = store.allColumnFamilyNames
+
+ override def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow,
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ store.prefixScanWithMultiValues(prefixKey, colFamilyName)
+ }
+
+ override def iteratorWithMultiValues(
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ store.iteratorWithMultiValues(colFamilyName)
+ }
}
/**
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
index ba5237d91305..cf0cab3c4623 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
@@ -93,6 +93,16 @@ class MemoryStateStore extends StateStore() {
throw new UnsupportedOperationException("Doesn't support multiple values
per key")
}
+ override def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow, colFamilyName: String):
StateStoreIterator[UnsafeRowPair] = {
+ throw new UnsupportedOperationException("Doesn't support prefix scan with
multiple values!")
+ }
+
+ override def iteratorWithMultiValues(
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ throw new UnsupportedOperationException("Doesn't support iterator with
multiple values!")
+ }
+
override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = {
StateStoreCheckpointInfo(id.partitionId, version + 1, None, None)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
index 4d7443870238..d26f996b5151 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
@@ -166,6 +166,16 @@ case class CkptIdCollectingStateStoreWrapper(innerStore:
StateStore) extends Sta
ret
}
override def hasCommitted: Boolean = innerStore.hasCommitted
+
+ override def prefixScanWithMultiValues(
+ prefixKey: UnsafeRow, colFamilyName: String):
StateStoreIterator[UnsafeRowPair] = {
+ innerStore.prefixScanWithMultiValues(prefixKey, colFamilyName)
+ }
+
+ override def iteratorWithMultiValues(
+ colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+ innerStore.iteratorWithMultiValues(colFamilyName)
+ }
}
class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
index 498ac7db20b6..a65565d8b245 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
@@ -223,7 +223,8 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
}
}
- // TODO: Address the new state format with Avro and enable the test with
Avro encoding
+ // TODO: [SPARK-55145] Address the new state format with Avro and enable the
test with Avro
+ // encoding
Seq("unsaferow").foreach { encoding =>
test(s"Event time as prefix: iterator operations (encoding = $encoding)") {
tryWithProviderResource(
@@ -281,6 +282,70 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
}
}
+ test(s"Event time as prefix: iterator with multiple values (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = "prefix",
+ useMultipleValuesPerKey = true,
+ dataEncoding = encoding)
+ ) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ // Put multiple values at different event times
+ // Insert in non-sorted order to verify ordering by event time
+ val values2 = Array(valueToRow(200), valueToRow(201))
+ store.putList(keyAndTimestampToRow("key1", 1, 1000L), values2)
+
+ val values1 = Array(valueToRow(100), valueToRow(101))
+ store.putList(keyAndTimestampToRow("key1", 1, -3000L), values1)
+
+ val values3 = Array(valueToRow(300), valueToRow(301))
+ store.putList(keyAndTimestampToRow("key2", 1, 2000L), values3)
+
+ // Test iteratorWithMultiValues
+ val iterator = store.iteratorWithMultiValues()
+ val results = iterator.map { pair =>
+ assert(pair.key.numFields() === 3) // key fields + timestamp
+
+ val keyStr = pair.key.getString(0)
+ val partitionId = pair.key.getInt(1)
+ // The timestamp will be placed at the end of the key row.
+ val timestamp = pair.key.getLong(2)
+ val value = pair.value.getInt(0)
+ (keyStr, partitionId, timestamp, value)
+ }.toList
+
+ iterator.close()
+
+ // Should return all individual values across different event times
+ assert(results.length === 6) // 2 values at each of 3 event times
+
+ // Verify results are ordered by event time (ascending)
+ val eventTimes = results.map(_._3)
+ val distinctEventTimes = eventTimes.distinct
+ assert(
+ distinctEventTimes === Seq(-3000L, 1000L, 2000L),
+ "Results should be ordered by event time"
+ )
+
+ // Verify the first 2 results are from time -3000L
+ assert(results.take(2).forall(_._3 === -3000L))
+ assert(results.take(2).map(_._4).toSet === Set(100, 101))
+
+ // Verify the next 2 results are from time 1000L
+ assert(results.slice(2, 4).forall(_._3 === 1000L))
+ assert(results.slice(2, 4).map(_._4).toSet === Set(200, 201))
+
+ // Verify the last 2 results are from time 2000L
+ assert(results.slice(4, 6).forall(_._3 === 2000L))
+ assert(results.slice(4, 6).map(_._4).toSet === Set(300, 301))
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
test(s"Event time as postfix: prefix scan operations (encoding =
$encoding)") {
tryWithProviderResource(
newStoreProviderWithTimestampEncoder(encoderType = "postfix",
dataEncoding = encoding)
@@ -336,6 +401,81 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
}
}
}
+
+ test(s"Event time as postfix: prefix scan with multiple values (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = "postfix",
+ useMultipleValuesPerKey = true,
+ dataEncoding = encoding
+ )
+ ) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ // Put multiple values for the same key at different event times
+
+ // Insert in non-sorted order to verify ordering by event time
+ val values2 = Array(valueToRow(200), valueToRow(201))
+ store.putList(keyAndTimestampToRow("key1", 1, 1000L), values2)
+
+ val values1 = Array(valueToRow(100), valueToRow(101))
+ store.putList(keyAndTimestampToRow("key1", 1, -3000L), values1)
+
+ val values3 = Array(valueToRow(300), valueToRow(301))
+ store.putList(keyAndTimestampToRow("key1", 1, 2000L), values3)
+
+ // Different key - should not be returned
+ val values4 = Array(valueToRow(400))
+ store.putList(keyAndTimestampToRow("key2", 1, 1500L), values4)
+
+ // Test prefixScanWithMultiValues - pass complete key to find all
event times
+ val key1 = keyToRow("key1", 1)
+ val iterator = store.prefixScanWithMultiValues(key1)
+
+ val results = iterator.map { pair =>
+ assert(pair.key.numFields() === 3) // key fields + timestamp
+
+ val keyStr = pair.key.getString(0)
+ val partitionId = pair.key.getInt(1)
+ // The timestamp will be placed at the end of the key row.
+ val timestamp = pair.key.getLong(2)
+ val value = pair.value.getInt(0)
+ (keyStr, partitionId, timestamp, value)
+ }.toList
+ iterator.close()
+
+ // Should return all individual values for key1 across different
event times
+ assert(results.length === 6) // 2 values at each of 3 event times
+
+ // Verify results are ordered by event time (ascending)
+ // Group by event time to verify ordering
+ val eventTimes = results.map(_._3)
+ val distinctEventTimes = eventTimes.distinct
+ assert(
+ distinctEventTimes === Seq(-3000L, 1000L, 2000L),
+ "Results should be ordered by event time"
+ )
+
+ // Verify the first 2 results are from time -3000L
+ assert(results.take(2).forall(_._3 === -3000L))
+ assert(results.take(2).map(_._4).toSet === Set(100, 101))
+
+ // Verify the next 2 results are from time 1000L
+ assert(results.slice(2, 4).forall(_._3 === 1000L))
+ assert(results.slice(2, 4).map(_._4).toSet === Set(200, 201))
+
+ // Verify the last 2 results are from time 2000L
+ assert(results.slice(4, 6).forall(_._3 === 2000L))
+ assert(results.slice(4, 6).map(_._4).toSet === Set(300, 301))
+
+ // Should not contain key2
+ assert(!results.exists(_._1 == "key2"))
+ } finally {
+ store.abort()
+ }
+ }
+ }
}
// Diverse set of timestamps that exercise binary lexicographic encoding
edge cases,
@@ -351,15 +491,19 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
* diverse timestamp values that exercise binary lexicographic encoding edge
cases.
*
* @param encoderType "prefix" or "postfix"
+ * @param useMultipleValuesPerKey whether to store multiple values per (key,
timestamp)
* @param encoding data encoding format (e.g. "unsaferow")
*/
private def testDiverseTimestampOrdering(
encoderType: String,
+ useMultipleValuesPerKey: Boolean,
encoding: String): Unit = {
+ val valuesPerKey = if (useMultipleValuesPerKey) 2 else 1
+
tryWithProviderResource(
newStoreProviderWithTimestampEncoder(
encoderType = encoderType,
- useMultipleValuesPerKey = false,
+ useMultipleValuesPerKey = useMultipleValuesPerKey,
dataEncoding = encoding)
) { provider =>
val store = provider.getStore(0)
@@ -368,7 +512,12 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
// Insert diverse timestamps in non-sorted order
diverseTimestamps.zipWithIndex.foreach { case (ts, idx) =>
val keyRow = keyAndTimestampToRow("key1", 1, ts)
- store.put(keyRow, valueToRow(idx))
+ if (useMultipleValuesPerKey) {
+ val values = Array(valueToRow(idx * 10), valueToRow(idx * 10 + 1))
+ store.putList(keyRow, values)
+ } else {
+ store.put(keyRow, valueToRow(idx))
+ }
}
// For postfix encoder, add a different key to verify prefix scan
isolation
@@ -380,16 +529,24 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
val iter = encoderType match {
// For prefix encoder, we use iterator
case "prefix" =>
- store.iterator()
+ if (useMultipleValuesPerKey) {
+ store.iteratorWithMultiValues()
+ } else {
+ store.iterator()
+ }
// For postfix encoder, we use prefix scan with ("key1", 1) as the
prefix key
case "postfix" =>
- store.prefixScan(keyToRow("key1", 1))
+ if (useMultipleValuesPerKey) {
+ store.prefixScanWithMultiValues(keyToRow("key1", 1))
+ } else {
+ store.prefixScan(keyToRow("key1", 1))
+ }
}
val results = iter.map(_.key.getLong(2)).toList
iter.close()
- assert(results.length === diverseTimestamps.length)
+ assert(results.length === diverseTimestamps.length * valuesPerKey)
// Verify event times are in ascending order
val distinctEventTimes = results.distinct
@@ -405,9 +562,12 @@ class RocksDBTimestampEncoderOperationsSuite extends
SharedSparkSession
// encoding
Seq("unsaferow").foreach { encoding =>
Seq("prefix", "postfix").foreach { encoderType =>
- test(s"Event time as $encoderType: ordering with diverse timestamps" +
- s" (encoding = $encoding)") {
- testDiverseTimestampOrdering(encoderType, encoding)
+ Seq(false, true).foreach { useMultipleValuesPerKey =>
+ val multiValueSuffix = if (useMultipleValuesPerKey) " and multiple
values" else ""
+ test(s"Event time as $encoderType: ordering with diverse timestamps" +
+ s"$multiValueSuffix (encoding = $encoding)") {
+ testDiverseTimestampOrdering(encoderType, useMultipleValuesPerKey,
encoding)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]