This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0b0933c52585 [SPARK-49050] Enabling deleteIfExists operator in TWS
with Virtual Column Families
0b0933c52585 is described below
commit 0b0933c525855dc3fa63e83fca7191a1919d502c
Author: Eric Marnadi <[email protected]>
AuthorDate: Wed Aug 28 15:40:20 2024 +0900
[SPARK-49050] Enabling deleteIfExists operator in TWS with Virtual Column
Families
### What changes were proposed in this pull request?
Fully integrating the TransformWithState operator with Virtual Column
Families by adding support for the deleteIfExists operator. In RocksDB.scala,
return the columnFamilyId if it exists in `removeColFamilyIfExists` so that we
can remove all the keys for the corresponding columnFamily once deleteIfExists
is called.
### Why are the changes needed?
To add support for the deleteIfExists operator with VCF, as this
functionality was not working when virtual column families were introduced.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Enabled previously disabled unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47880 from ericm-db/tws-deleteIfExists.
Authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 8 ++++----
.../streaming/state/RocksDBStateEncoder.scala | 6 ++++++
.../streaming/state/RocksDBStateStoreProvider.scala | 20 +++++++++++---------
.../state/StateSchemaCompatibilityChecker.scala | 12 +++++++++---
.../sql/streaming/TransformWithStateSuite.scala | 3 +--
5 files changed, 31 insertions(+), 18 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 64b3c3646063..7badc26bf044 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -214,14 +214,14 @@ class RocksDB(
/**
* Remove RocksDB column family, if exists
+ * @return columnFamilyId if it exists, else None
*/
- def removeColFamilyIfExists(colFamilyName: String): Boolean = {
+ def removeColFamilyIfExists(colFamilyName: String): Option[Short] = {
if (checkColFamilyExists(colFamilyName)) {
- colFamilyNameToIdMap.remove(colFamilyName)
shouldForceSnapshot.set(true)
- true
+ Some(colFamilyNameToIdMap.remove(colFamilyName))
} else {
- false
+ None
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index 202fd224ddfd..4c7a226e0973 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -123,6 +123,12 @@ object RocksDBStateEncoder {
}
}
+ def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] = {
+ val encodedBytes = new Array[Byte](VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+ Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET,
virtualColFamilyId)
+ encodedBytes
+ }
+
/**
* Encode the UnsafeRow of N bytes as a N+1 byte array.
* @note This creates a new byte array and memcopies the UnsafeRow to the
new array.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 075ab7d00842..685cc9a1533e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -321,16 +321,18 @@ private[sql] class RocksDBStateStoreProvider
verify(useColumnFamilies, "Column families are not supported in this
store")
val result = {
- val colFamilyExists = rocksDB.removeColFamilyIfExists(colFamilyName)
-
- if (colFamilyExists) {
- val colFamilyIdBytes =
- keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes()
- rocksDB.prefixScan(colFamilyIdBytes).foreach { kv =>
- rocksDB.remove(kv.key)
- }
+ val colFamilyId = rocksDB.removeColFamilyIfExists(colFamilyName)
+
+ colFamilyId match {
+ case Some(vcfId) =>
+ val colFamilyIdBytes =
+ RocksDBStateEncoder.getColumnFamilyIdBytes(vcfId)
+ rocksDB.prefixScan(colFamilyIdBytes).foreach { kv =>
+ rocksDB.remove(kv.key)
+ }
+ true
+ case None => false
}
- colFamilyExists
}
keyValueEncoderMap.remove(colFamilyName)
result
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index ca03de6f1ad3..47b1cb90e00a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -176,9 +176,15 @@ class StateSchemaCompatibilityChecker(
true
} else {
// validate if the new schema is compatible with the existing schema
- existingStateSchemaList.lazyZip(newStateSchemaList).foreach {
- case (existingStateSchema, newStateSchema) =>
- check(existingStateSchema, newStateSchema, ignoreValueSchema)
+ val existingSchemaMap = existingStateSchemaList.map { schema =>
+ schema.colFamilyName -> schema
+ }.toMap
+ // For each new state variable, we want to compare it to the old state
variable
+ // schema with the same name
+ newStateSchemaList.foreach { newSchema =>
+ existingSchemaMap.get(newSchema.colFamilyName).foreach {
existingStateSchema =>
+ check(existingStateSchema, newSchema, ignoreValueSchema)
+ }
}
false
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index bd18fd83e43a..9eeedd859809 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -725,8 +725,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
checkAnswer(df, Seq(("a", "1"), ("b", "1")).toDF())
}
- // TODO SPARK-48796 after restart state id will not be the same
- ignore("transformWithState - test deleteIfExists operator") {
+ test("transformWithState - test deleteIfExists operator") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]