This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cc9913bc9839 [SPARK-55129][SS] Introduce new key encoders for
timestamp as a first class (UnsafeRow)
cc9913bc9839 is described below
commit cc9913bc9839396fc6c6205bc891f0324fb5d480
Author: Jungtaek Lim <[email protected]>
AuthorDate: Sun Feb 15 17:12:45 2026 +0900
[SPARK-55129][SS] Introduce new key encoders for timestamp as a first class
(UnsafeRow)
### What changes were proposed in this pull request?
This PR proposes to introduce key encodings which include "timestamp" as
the first class.
Proposed key encodings:
* `TimestampAsPrefixKeyStateEncoder` /
`TimestampAsPrefixKeyStateEncoderSpec`
* Place event time as a prefix, and key as remaining part of serialized
format
* `TimestampAsPostfixKeyStateEncoder` /
`TimestampAsPostfixKeyStateEncoderSpec`
* Place key first, and event time as a postfix of serialized format
The type of timestamp is LongType (long) - when serializing the timestamp,
we flip the sign byte and store the value as "big endian". This ensures the
natural ordering of long type, across positive, 0, and negative values. The
serialization format of the original key is the same, e.g. for UnsafeRow, same
as underlying binary format.
These encodings are specification of prefix and range key encodings:
* `TimestampAsPrefixKeyStateEncoderSpec` provides the range scan with
timestamp.
* `TimestampAsPostfixKeyStateEncoderSpec` provides the prefix scan with the
key, additionally provides the range scan with the remaining timestamp. NOTE:
The range scan with timestamp is only scoped to the same key.
Compared to the prefix/range key encoding, this can eliminate the overhead
of combining two UnsafeRows, minimum 12 bytes in each key in overall (8 bytes
of null-tracking bitset, 4 bytes of storing length for one of two UnsafeRows).
It can also skip projection(s) from deserialization as well.
To cope with the existing StateStore API which does not have a concept of
timestamp on API layer, we require the caller to project the key row to attach
the timestamp manually before calling the StateStore API. Flipping the coin,
the key row being produced by the StateStore API will be also the form of
original row + timestamp and caller is responsible to project the original row
from the returned row.
Note: the performance is not optimal since there are multiple places of
projections (array creation and memcpy), and we will need to introduce API
level of change to eliminate these projections.
The change is big already, so this PR only enables the new key encoding
with UnsafeRow. Supporting Avro will be a follow up work.
### Why are the changes needed?
The existing key encodings are too general to serve the same with
noticeable overheads, in terms of additional bytes on serialized format. The
proposed key encodings will do the same with minimized overhead, given the fact
it only needs to handle timestamp along with the key.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test suite.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4.5-sonnet
The above is used for creating a new test suite. All other parts aren't
generated by LLM.
Closes #53911 from HeartSaVioR/SPARK-55129.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/state/RocksDBStateEncoder.scala | 205 ++++++++-
.../sql/execution/streaming/state/StateStore.scala | 31 +-
.../RocksDBTimestampEncoderOperationsSuite.scala | 491 +++++++++++++++++++++
3 files changed, 725 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index 4111e7b42623..c8f68dfd0388 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -34,8 +34,9 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions,
AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow,
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow,
Literal, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
import
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
import
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.StateStoreColumnFamilySchemaUtils
import
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{SCHEMA_ID_PREFIX_BYTES,
STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION}
@@ -846,6 +847,8 @@ class AvroStateEncoder(
}
}
StructType(remainingSchema)
+ case _ =>
+ throw unsupportedOperationForKeyStateEncoder("createAvroEnc")
}
// Handle suffix key schema for prefix scan case
@@ -1713,6 +1716,206 @@ class NoPrefixKeyStateEncoder(
}
}
+/**
+ * The singleton instance to provide utility-like methods for key state
encoders which include
+ * timestamp, specifically [[TimestampAsPrefixKeyStateEncoder]] and
+ * [[TimestampAsPostfixKeyStateEncoder]].
+ */
+object TimestampKeyStateEncoder {
+ private val INTERNAL_TIMESTAMP_COLUMN_NAME = "__event_time"
+
+ def keySchemaWithTimestamp(keySchema: StructType): StructType = {
+ StructType(keySchema.fields)
+ .add(name = INTERNAL_TIMESTAMP_COLUMN_NAME, dataType = LongType,
nullable = false)
+ }
+
+ def getAttachTimestampProjection(keyWithoutTimestampSchema: StructType):
UnsafeProjection = {
+ val refs = keyWithoutTimestampSchema.zipWithIndex.map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(
+ refs :+ Literal(0L), // placeholder for timestamp
+ DataTypeUtils.toAttributes(StructType(keyWithoutTimestampSchema)))
+ }
+
+ def getDetachTimestampProjection(keyWithTimestampSchema: StructType):
UnsafeProjection = {
+ val refs = keyWithTimestampSchema.zipWithIndex.dropRight(1).map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(refs)
+ }
+
+ def attachTimestamp(
+ attachTimestampProjection: UnsafeProjection,
+ keyWithTimestampSchema: StructType,
+ key: UnsafeRow,
+ timestamp: Long): UnsafeRow = {
+ val rowWithTimestamp = attachTimestampProjection(key)
+ rowWithTimestamp.setLong(keyWithTimestampSchema.length - 1, timestamp)
+ rowWithTimestamp
+ }
+
+ def extractTimestamp(key: UnsafeRow): Long = {
+ key.getLong(key.numFields - 1)
+ }
+}
+
+/**
+ * The abstract base class for key state encoders which include timestamp,
specifically
+ * [[TimestampAsPrefixKeyStateEncoder]] and
[[TimestampAsPostfixKeyStateEncoder]].
+ */
+abstract class TimestampKeyStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType)
+ extends RocksDBKeyStateEncoder with Logging {
+
+ protected val detachTimestampProjection: UnsafeProjection =
+ TimestampKeyStateEncoder.getDetachTimestampProjection(keySchema)
+
+ protected val attachTimestampProjection: UnsafeProjection =
+ TimestampKeyStateEncoder.getAttachTimestampProjection(
+ StructType(keySchema.fields.dropRight(1)))
+
+ protected def decodeKey(keyBytes: Array[Byte], startPos: Int): UnsafeRow = {
+ val rowBytesLength = keyBytes.length - 8
+ val rowBytes = new Array[Byte](rowBytesLength)
+ Platform.copyMemory(
+ keyBytes, Platform.BYTE_ARRAY_OFFSET + startPos,
+ rowBytes, Platform.BYTE_ARRAY_OFFSET,
+ rowBytesLength
+ )
+ // The encoded row does not include the timestamp (it's stored separately),
+ // so decode with keySchema.length - 1 fields.
+ dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length - 1)
+ }
+
+ // NOTE: We reuse the ByteBuffer to avoid allocating a new one for every
encoding/decoding,
+ // which means the encoder is not thread-safe. Built-in operators do not
access the encoder in
+ // multiple threads, but if we are concerned about thread-safety in the
future, we can maintain
+ // the thread-local of ByteBuffer to retain the reusability of the instance
while avoiding
+ // thread-safety issue. We do not use position - we always put/get at offset
0.
+ private val buffForBigEndianLong =
ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN)
+
+ private val SIGN_MASK_FOR_LONG: Long = 0x8000000000000000L
+
+ protected def encodeTimestamp(timestamp: Long): Array[Byte] = {
+ // Flip the sign bit to ensure correct lexicographical ordering, even for
negative timestamps.
+ // We should flip the sign bit back when decoding the timestamp.
+ val signFlippedTimestamp = timestamp ^ SIGN_MASK_FOR_LONG
+ buffForBigEndianLong.putLong(0, signFlippedTimestamp)
+ buffForBigEndianLong.array()
+ }
+
+ protected def decodeTimestamp(keyBytes: Array[Byte], startPos: Int): Long = {
+ buffForBigEndianLong.put(0, keyBytes, startPos, 8)
+ val signFlippedTimestamp = buffForBigEndianLong.getLong(0)
+ // Flip the sign bit back to get the original timestamp.
+ signFlippedTimestamp ^ SIGN_MASK_FOR_LONG
+ }
+
+ protected def attachTimestamp(key: UnsafeRow, timestamp: Long): UnsafeRow = {
+ TimestampKeyStateEncoder.attachTimestamp(attachTimestampProjection,
keySchema, key, timestamp)
+ }
+
+ protected def detachTimestamp(key: UnsafeRow): UnsafeRow = {
+ detachTimestampProjection(key)
+ }
+
+ def extractTimestamp(key: UnsafeRow): Long = {
+ TimestampKeyStateEncoder.extractTimestamp(key)
+ }
+}
+
+/**
+ * Encodes row with timestamp as prefix of the key, so that they can be
scanned based on
+ * timestamp ordering.
+ *
+ * The encoder expects the provided key schema to have [original key
fields..., timestamp field].
+ * The key has to conform to this schema when putting/getting from the state
store. The schema
+ * needs to be built via calling
[[TimestampKeyStateEncoder.keySchemaWithTimestamp()]].
+ */
+class TimestampAsPrefixKeyStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType,
+ useColumnFamilies: Boolean = false)
+ extends TimestampKeyStateEncoder(dataEncoder, keySchema) with Logging {
+
+ override def supportPrefixKeyScan: Boolean = false
+
+ override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def encodeKey(row: UnsafeRow): Array[Byte] = {
+ val prefix = dataEncoder.encodeKey(detachTimestamp(row))
+ val timestamp = extractTimestamp(row)
+
+ val byteArray = new Array[Byte](prefix.length + 8)
+ Platform.copyMemory(
+ encodeTimestamp(timestamp), Platform.BYTE_ARRAY_OFFSET,
+ byteArray, Platform.BYTE_ARRAY_OFFSET, 8)
+ Platform.copyMemory(prefix, Platform.BYTE_ARRAY_OFFSET,
+ byteArray, Platform.BYTE_ARRAY_OFFSET + 8, prefix.length)
+
+ byteArray
+ }
+
+ override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+ val timestamp = decodeTimestamp(keyBytes, 0)
+ val row = decodeKey(keyBytes, 8)
+ attachTimestamp(row, timestamp)
+ }
+
+ // TODO: [SPARK-55491] Revisit this to support delete range if needed.
+ override def supportsDeleteRange: Boolean = false
+}
+
+/**
+ * Encodes row with timestamp as postfix of the key, so that prefix scan with
the keys
+ * having the same key but different timestamps is supported. In addition,
timestamp is stored
+ * in sort order to support timestamp ordered iteration in the result of
prefix scan.
+ *
+ * The encoder expects the provided key schema to have [original key
fields..., timestamp field].
+ * The key has to be conformed to this schema when putting/getting from the
state store. The schema
+ * needs to be built via calling
[[TimestampKeyStateEncoder.keySchemaWithTimestamp()]].
+ */
+class TimestampAsPostfixKeyStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType,
+ useColumnFamilies: Boolean = false)
+ extends TimestampKeyStateEncoder(dataEncoder, keySchema) with Logging {
+
+ override def supportPrefixKeyScan: Boolean = true
+
+ override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+ dataEncoder.encodeKey(prefixKey)
+ }
+
+ override def encodeKey(row: UnsafeRow): Array[Byte] = {
+ val prefix = dataEncoder.encodeKey(detachTimestamp(row))
+ val timestamp = extractTimestamp(row)
+
+ val byteArray = new Array[Byte](prefix.length + 8)
+
+ Platform.copyMemory(prefix, Platform.BYTE_ARRAY_OFFSET,
+ byteArray, Platform.BYTE_ARRAY_OFFSET, prefix.length)
+ Platform.copyMemory(
+ encodeTimestamp(timestamp), Platform.BYTE_ARRAY_OFFSET,
+ byteArray, Platform.BYTE_ARRAY_OFFSET + prefix.length,
+ 8
+ )
+
+ byteArray
+ }
+
+ override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+ val row = decodeKey(keyBytes, 0)
+ val rowBytesLength = keyBytes.length - 8
+ val timestamp = decodeTimestamp(keyBytes, rowBytesLength)
+ attachTimestamp(row, timestamp)
+ }
+
+ override def supportsDeleteRange: Boolean = false
+}
+
/**
* Supports encoding multiple values per key in RocksDB.
* A single value is encoded in the format below, where first value is number
of bytes
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 3f90cb4edbc7..acb82680d279 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -638,6 +638,36 @@ case class RangeKeyScanStateEncoderSpec(
}
}
+/** The encoder specification for [[TimestampAsPrefixKeyStateEncoder]]. */
+case class TimestampAsPrefixKeyStateEncoderSpec(keySchema: StructType)
+ extends KeyStateEncoderSpec {
+
+ override def toEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ useColumnFamilies: Boolean): RocksDBKeyStateEncoder = {
+ new TimestampAsPrefixKeyStateEncoder(dataEncoder, keySchema,
useColumnFamilies)
+ }
+
+ override def jsonValue: JValue = {
+ "keyStateEncoderType" -> JString("TimestampAsPrefixKeyStateEncoderSpec")
+ }
+}
+
+/** The encoder specification for [[TimestampAsPostfixKeyStateEncoder]]. */
+case class TimestampAsPostfixKeyStateEncoderSpec(keySchema: StructType)
+ extends KeyStateEncoderSpec {
+
+ override def toEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ useColumnFamilies: Boolean): RocksDBKeyStateEncoder = {
+ new TimestampAsPostfixKeyStateEncoder(dataEncoder, keySchema,
useColumnFamilies)
+ }
+
+ override def jsonValue: JValue = {
+ "keyStateEncoderType" -> JString("TimestampAsPostfixKeyStateEncoderSpec")
+ }
+}
+
/**
* Trait representing a provider that provide [[StateStore]] instances
representing
* versions of state data.
@@ -1081,7 +1111,6 @@ class UnsafeRowPair(var key: UnsafeRow = null, var value:
UnsafeRow = null) {
}
}
-
/**
* Companion object to [[StateStore]] that provides helper methods to create
and retrieve stores
* by their unique ids. In addition, when a SparkContext is active (i.e.
SparkEnv.get is not null),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
new file mode 100644
index 000000000000..498ac7db20b6
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.should.Matchers
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.streaming.runtime.StreamExecution
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.tags.ExtendedSQLTest
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Test suite for RocksDB state store operations with timestamp key encoders,
mostly for
+ * [[TimestampAsPrefixKeyStateEncoder]] and
[[TimestampAsPostfixKeyStateEncoder]].
+ */
+@ExtendedSQLTest
+class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
+ with BeforeAndAfterEach with Matchers {
+
+ // Test schemas
+ private val keySchema = StructType(Seq(
+ StructField("key", StringType, nullable = true),
+ StructField("partitionId", IntegerType, nullable = true)
+ ))
+ private val valueSchema = StructType(Seq(StructField("value", IntegerType,
nullable = true)))
+
+ // Column family names for testing
+ private val testColFamily = "test_cf"
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ StateStore.stop()
+ require(!StateStore.isMaintenanceRunning)
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ }
+
+ override def afterEach(): Unit = {
+ StateStore.stop()
+ require(!StateStore.isMaintenanceRunning)
+ super.afterEach()
+ }
+
+ private def newDir(): String = Utils.createTempDir().getCanonicalPath
+
+ // TODO: [SPARK-55145] Address the new state format with Avro and enable the
test with Avro
+ // encoding
+ Seq("unsaferow").foreach { encoding =>
+ Seq("prefix", "postfix").foreach { encoderType =>
+ test(s"Event time as $encoderType: basic put and get operations
(encoding = $encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = encoderType, dataEncoding = encoding)) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ // Test put and get
+ val keyWithTimestamp1 = keyAndTimestampToRow("key1", 1, 1000L)
+ val value1 = valueToRow(100)
+
+ store.put(keyWithTimestamp1, value1)
+ val retrievedValue = store.get(keyWithTimestamp1)
+
+ assert(retrievedValue != null)
+ assert(retrievedValue.getInt(0) === 100)
+
+ // Test get with different event time should return null
+ val keyWithTimestamp2 = keyAndTimestampToRow("key1", 1, 2000L)
+ assert(store.get(keyWithTimestamp2) === null)
+
+ // Test with different key should return null
+ val keyWithTimestamp3 = keyAndTimestampToRow("key2", 1, 1000L)
+ assert(store.get(keyWithTimestamp3) === null)
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
+ test(s"Event time as $encoderType: remove operations (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = encoderType, dataEncoding = encoding)) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ val keyWithTimestamp1 = keyAndTimestampToRow("key1", 1, 1000L)
+ val value1 = valueToRow(100)
+
+ // Put and verify
+ store.put(keyWithTimestamp1, value1)
+ assert(store.get(keyWithTimestamp1) != null)
+
+ // Remove and verify
+ store.remove(keyWithTimestamp1)
+ assert(store.get(keyWithTimestamp1) === null)
+
+ // Removing non-existent key should not throw error
+ store.remove(keyAndTimestampToRow("nonexistent", 1, 2000L))
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
+ test(s"Event time as $encoderType: multiple values per key (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = encoderType,
+ useMultipleValuesPerKey = true,
+ dataEncoding = encoding)
+ ) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ val keyWithTimestamp1 = keyAndTimestampToRow("key1", 1, 1000L)
+ val values = Array(valueToRow(100), valueToRow(200),
valueToRow(300))
+
+ // Test putList
+ store.putList(keyWithTimestamp1, values)
+
+ // Test valuesIterator
+ val retrievedValues =
+ store.valuesIterator(keyWithTimestamp1).map(_.copy()).toList
+ assert(retrievedValues.length === 3)
+ assert(
+ retrievedValues.map(_.getInt(0)).sorted === Array(
+ 100,
+ 200,
+ 300
+ ).sorted
+ )
+
+ // Test with different event time should return empty iterator
+ val keyWithTimestamp2 = keyAndTimestampToRow("key1", 1, 2000L)
+ val emptyIterator = store.valuesIterator(keyWithTimestamp2)
+ assert(!emptyIterator.hasNext)
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
+ test(s"Event time as $encoderType: merge operations (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = encoderType,
+ useMultipleValuesPerKey = true,
+ dataEncoding = encoding)
+ ) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ val keyWithTimestamp1 = keyAndTimestampToRow("key1", 1, 1000L)
+ val value1 = valueToRow(100)
+ val value2 = valueToRow(200)
+
+ // Test merge single values
+ store.merge(keyWithTimestamp1, value1)
+ store.merge(keyWithTimestamp1, value2)
+
+ val retrievedValues =
+ store.valuesIterator(keyWithTimestamp1).map(_.copy()).toList
+ assert(retrievedValues.length === 2)
+ assert(retrievedValues.map(_.getInt(0)).toSet === Set(100, 200))
+
+ // Test mergeList
+ val additionalValues = Array(valueToRow(300), valueToRow(400))
+ store.mergeList(keyWithTimestamp1, additionalValues)
+
+ val allValues =
store.valuesIterator(keyWithTimestamp1).map(_.copy()).toList
+ assert(allValues.length === 4)
+ assert(allValues.map(_.getInt(0)).toSet === Set(100, 200, 300,
400))
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
+ test(s"Event time as $encoderType: null value validation (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = encoderType, dataEncoding = encoding)) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ val keyWithTimestamp = keyAndTimestampToRow("key1", 1, 1000L)
+
+ // Test null value should throw exception
+ intercept[IllegalArgumentException] {
+ store.put(keyWithTimestamp, null)
+ }
+ } finally {
+ store.abort()
+ }
+ }
+ }
+ }
+ }
+
+ // TODO: Address the new state format with Avro and enable the test with
Avro encoding
+ Seq("unsaferow").foreach { encoding =>
+ test(s"Event time as prefix: iterator operations (encoding = $encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = "prefix", dataEncoding = encoding)) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ val entries = Map(
+ keyAndTimestampToRow("key1", 1, 2000L) -> valueToRow(100),
+ keyAndTimestampToRow("key2", 1, 1000L) -> valueToRow(200),
+ keyAndTimestampToRow("key1", 2, -3000L) -> valueToRow(300)
+ )
+
+ // Put all entries (in non-sorted order)
+ entries.foreach { case (keyAndTimestampRow, value) =>
+ store.put(keyAndTimestampRow, value)
+ }
+
+ // Test iterator - should return all entries ordered by event time
+ val iterator = store.iterator()
+ val results = iterator.map { pair =>
+ assert(pair.key.numFields() === 3) // key fields + timestamp
+
+ val keyString = pair.key.getString(0)
+ val partitionId = pair.key.getInt(1)
+ // The timestamp will be placed at the end of the key row.
+ val timestamp = pair.key.getLong(2)
+ val value = pair.value.getInt(0)
+ (keyString, partitionId, timestamp, value)
+ }.toList
+
+ iterator.close()
+
+ assert(results.length === 3)
+
+ // Verify results are ordered by event time (ascending)
+ val eventTimes = results.map(_._3)
+ assert(
+ eventTimes === Seq(-3000L, 1000L, 2000L),
+ "Results should be ordered by event time"
+ )
+
+ // Verify all expected entries are present
+ val retrievedEntries = results.map {
+ case (key, partId, time, value) =>
+ ((key, partId, time), value)
+ }.toMap
+ assert(retrievedEntries(("key1", 2, -3000L)) === 300)
+ assert(retrievedEntries(("key2", 1, 1000L)) === 200)
+ assert(retrievedEntries(("key1", 1, 2000L)) === 100)
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
+ test(s"Event time as postfix: prefix scan operations (encoding =
$encoding)") {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(encoderType = "postfix",
dataEncoding = encoding)
+ ) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ // Put entries with the same complete key but different event times
+ // Prefix scan should find all entries with the same key across
different event times
+
+ // Insert in non-sorted order to verify that prefix scan returns
them sorted by time
+ store.put(keyAndTimestampToRow("key1", 1, 2000L), valueToRow(102))
+ store.put(keyAndTimestampToRow("key1", 1, -3000L), valueToRow(100))
+ store.put(keyAndTimestampToRow("key1", 1, 1000L), valueToRow(101))
+
+ // Different key (key2, 1) - should not be returned
+ store.put(keyAndTimestampToRow("key2", 1, 1500L), valueToRow(200))
+
+ // Test prefixScan - pass the complete key to find all event times
for that key
+ val iterator = store.prefixScan(keyToRow("key1", 1))
+
+ val results = iterator.map { pair =>
+ assert(pair.key.numFields() === 3) // key fields + timestamp
+
+ val keyStr = pair.key.getString(0)
+ val partitionId = pair.key.getInt(1)
+ // The timestamp will be placed at the end of the key row.
+ val timestamp = pair.key.getLong(2)
+ val value = pair.value.getInt(0)
+ (keyStr, partitionId, timestamp, value)
+ }.toList
+ iterator.close()
+
+ // Should return all entries with the same complete key but
different event times
+ assert(results.length === 3)
+
+ // Verify results are ordered by event time (ascending)
+ val eventTimes = results.map(_._3)
+ assert(
+ eventTimes === Seq(-3000L, 1000L, 2000L),
+ "Results should be ordered by event time"
+ )
+
+ // Verify all expected entries are present
+ assert(results(0) === (("key1", 1, -3000L, 100)))
+ assert(results(1) === (("key1", 1, 1000L, 101)))
+ assert(results(2) === (("key1", 1, 2000L, 102)))
+
+ // Should not contain key2
+ assert(!results.exists(_._1 == "key2"))
+ } finally {
+ store.abort()
+ }
+ }
+ }
+ }
+
+ // Diverse set of timestamps that exercise binary lexicographic encoding
edge cases,
+ // reusing the same values from range scan encoder tests in
RocksDBStateStoreSuite,
+ // including large negatives, small negatives, zero, small positives, large
positives,
+ // and powers of 2.
+ private val diverseTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L,
1L, 2L, 8L,
+ -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L,
+ -32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
+
+ /**
+ * Tests that the given encoder type correctly orders entries by event time
when using
+ * diverse timestamp values that exercise binary lexicographic encoding edge
cases.
+ *
+ * @param encoderType "prefix" or "postfix"
+ * @param encoding data encoding format (e.g. "unsaferow")
+ */
+ private def testDiverseTimestampOrdering(
+ encoderType: String,
+ encoding: String): Unit = {
+ tryWithProviderResource(
+ newStoreProviderWithTimestampEncoder(
+ encoderType = encoderType,
+ useMultipleValuesPerKey = false,
+ dataEncoding = encoding)
+ ) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ // Insert diverse timestamps in non-sorted order
+ diverseTimestamps.zipWithIndex.foreach { case (ts, idx) =>
+ val keyRow = keyAndTimestampToRow("key1", 1, ts)
+ store.put(keyRow, valueToRow(idx))
+ }
+
+ // For postfix encoder, add a different key to verify prefix scan
isolation
+ if (encoderType == "postfix") {
+ store.put(keyAndTimestampToRow("key2", 1, 500L), valueToRow(999))
+ }
+
+ // Read results back using the appropriate scan method
+ val iter = encoderType match {
+ // For prefix encoder, we use iterator
+ case "prefix" =>
+ store.iterator()
+ // For postfix encoder, we use prefix scan with ("key1", 1) as the
prefix key
+ case "postfix" =>
+ store.prefixScan(keyToRow("key1", 1))
+ }
+
+ val results = iter.map(_.key.getLong(2)).toList
+ iter.close()
+
+ assert(results.length === diverseTimestamps.length)
+
+ // Verify event times are in ascending order
+ val distinctEventTimes = results.distinct
+ assert(distinctEventTimes === diverseTimestamps.sorted,
+ "Results should be ordered by event time")
+ } finally {
+ store.abort()
+ }
+ }
+ }
+
+ // TODO: [SPARK-55145] Address the new state format with Avro and enable the
test with Avro
+ // encoding
+ Seq("unsaferow").foreach { encoding =>
+ Seq("prefix", "postfix").foreach { encoderType =>
+ test(s"Event time as $encoderType: ordering with diverse timestamps" +
+ s" (encoding = $encoding)") {
+ testDiverseTimestampOrdering(encoderType, encoding)
+ }
+ }
+ }
+
+ // Helper methods to create test data
+ private val keyProjection = UnsafeProjection.create(keySchema)
+ private val keyAndTimestampProjection = UnsafeProjection.create(
+ TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema)
+ )
+
+ private def keyToRow(key: String, partitionId: Int): UnsafeRow = {
+ keyProjection.apply(InternalRow(UTF8String.fromString(key),
partitionId)).copy()
+ }
+
+ private def keyAndTimestampToRow(key: String, partitionId: Int, timestamp:
Long): UnsafeRow = {
+ keyAndTimestampProjection.apply(
+ InternalRow(UTF8String.fromString(key), partitionId, timestamp)).copy()
+ }
+
+ private def valueToRow(value: Int): UnsafeRow = {
+ UnsafeProjection.create(valueSchema).apply(InternalRow(value)).copy()
+ }
+
+ // Helper to create a new store provider with timestamp encoder
+ private def newStoreProviderWithTimestampEncoder(
+ encoderType: String, // "prefix" or "postfix"
+ useColumnFamilies: Boolean = true,
+ useMultipleValuesPerKey: Boolean = false,
+ dataEncoding: String = "unsaferow"): RocksDBStateStoreProvider = {
+
+ val keyStateEncoderSpec = encoderType match {
+ case "prefix" => TimestampAsPrefixKeyStateEncoderSpec(
+ TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema))
+ case "postfix" => TimestampAsPostfixKeyStateEncoderSpec(
+ TimestampKeyStateEncoder.keySchemaWithTimestamp(keySchema))
+ case _ => throw new IllegalArgumentException(s"Unknown encoder type:
$encoderType")
+ }
+
+ // Create a copy of SQLConf and set data encoding format on it
+ val sqlConf = SQLConf.get.clone()
+ sqlConf.setConfString(
+ "spark.sql.streaming.stateStore.encodingFormat",
+ dataEncoding)
+
+ val provider = new RocksDBStateStoreProvider()
+ val stateStoreId = StateStoreId(newDir(), Random.nextInt(), 0)
+ val conf = new Configuration
+ conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+ val testProvider = new TestStateSchemaProvider()
+ testProvider.captureSchema(
+ testColFamily,
+ keySchema,
+ valueSchema
+ )
+
+ val storeConf = new StateStoreConf(sqlConf)
+ provider.init(
+ stateStoreId,
+ keySchema,
+ valueSchema,
+ keyStateEncoderSpec,
+ useColumnFamilies,
+ storeConf,
+ conf,
+ useMultipleValuesPerKey,
+ Some(testProvider))
+
+ provider
+ }
+
+ // Helper method for resource management
+ private def tryWithProviderResource[T](provider: RocksDBStateStoreProvider)
+ (f: RocksDBStateStoreProvider => T): T = {
+ try {
+ f(provider)
+ } finally {
+ provider.close()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]