This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 56730f6390a1 [SPARK-46731][SS] Manage state store provider instance by
state data source - reader
56730f6390a1 is described below
commit 56730f6390a19aeada75b866e64115a957212877
Author: Jungtaek Lim <[email protected]>
AuthorDate: Sat Jan 20 08:12:02 2024 +0900
[SPARK-46731][SS] Manage state store provider instance by state data source
- reader
### What changes were proposed in this pull request?
This PR proposes to change state data source - reader part to manage state
store provider instance by itself.
### Why are the changes needed?
Currently, state data source initializes state store instance via
StateStore.get() which also initializes state store provider instance and
registers the provider instance to the coordinator. This involves unnecessary
overheads e.g. maintenance task could be triggered for this provider.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44751 from HeartSaVioR/SPARK-46731.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../v2/state/StatePartitionReader.scala | 16 +++++++------
.../StreamStreamJoinStatePartitionReader.scala | 3 ++-
.../state/SymmetricHashJoinStateManager.scala | 28 ++++++++++++++++++----
3 files changed, 35 insertions(+), 12 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
index ef8d7bf628bf..b79079aca56e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeRow}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
-import org.apache.spark.sql.execution.streaming.state.{ReadStateStore,
StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
+import org.apache.spark.sql.execution.streaming.state.{ReadStateStore,
StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -53,15 +53,13 @@ class StatePartitionReader(
private val keySchema = SchemaUtil.getSchemaAsDataType(schema,
"key").asInstanceOf[StructType]
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema,
"value").asInstanceOf[StructType]
- private lazy val store: ReadStateStore = {
+ private lazy val provider: StateStoreProvider = {
val stateStoreId =
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition,
partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId,
partition.queryId)
-
val allStateStoreMetadata = new StateMetadataPartitionReader(
partition.sourceOptions.stateCheckpointLocation.getParent.toString,
hadoopConf)
.stateMetadata.toArray
-
val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
entry.operatorId == partition.sourceOptions.operatorId &&
entry.stateStoreName == partition.sourceOptions.storeName
@@ -78,9 +76,12 @@ class StatePartitionReader(
stateStoreMetadata.head.numColsPrefixKey
}
- StateStore.getReadOnly(stateStoreProviderId, keySchema, valueSchema,
- numColsPrefixKey = numColsPrefixKey, version =
partition.sourceOptions.batchId + 1,
- storeConf = storeConf, hadoopConf = hadoopConf.value)
+ StateStoreProvider.createAndInit(
+ stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey,
storeConf, hadoopConf.value)
+ }
+
+ private lazy val store: ReadStateStore = {
+ provider.getReadStore(partition.sourceOptions.batchId + 1)
}
private lazy val iter: Iterator[InternalRow] = {
@@ -104,6 +105,7 @@ class StatePartitionReader(
override def close(): Unit = {
current = null
store.abort()
+ provider.close()
}
private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
index d0dd6cb7d1b9..e5a5dddefef5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
@@ -115,7 +115,8 @@ class StreamStreamJoinStatePartitionReader(
hadoopConf = hadoopConf.value,
partitionId = partition.partition,
formatVersion,
- skippedNullValueCount = None
+ skippedNullValueCount = None,
+ useStateStoreCoordinator = false
)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index b67c5ffd09a1..58e5301ed559 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -50,6 +50,12 @@ import org.apache.spark.util.NextIterator
* @param hadoopConf Hadoop configuration for reading state data
from storage
* @param partitionId A partition ID of source RDD.
* @param stateFormatVersion The version of format for state.
+ * @param skippedNullValueCount The instance of SQLMetric tracking the number
of skipped null
+ * values.
+ * @param useStateStoreCoordinator Whether to use a state store coordinator
to maintain the state
+ * store providers being used in this class.
If true, Spark will
+ * take care of management for state store
providers, e.g. running
+ * maintenance task for these providers.
*
* Internally, the key -> multiple values is stored in two [[StateStore]]s.
* - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number
of values
@@ -79,7 +85,8 @@ class SymmetricHashJoinStateManager(
hadoopConf: Configuration,
partitionId: Int,
stateFormatVersion: Int,
- skippedNullValueCount: Option[SQLMetric] = None) extends Logging {
+ skippedNullValueCount: Option[SQLMetric] = None,
+ useStateStoreCoordinator: Boolean = true) extends Logging {
import SymmetricHashJoinStateManager._
/*
@@ -443,6 +450,7 @@ class SymmetricHashJoinStateManager(
/** Helper trait for invoking common functionalities of a state store. */
private abstract class StateStoreHandler(stateStoreType: StateStoreType)
extends Logging {
+ private var stateStoreProvider: StateStoreProvider = _
/** StateStore that the subclasses of this class is going to operate on */
protected def stateStore: StateStore
@@ -457,6 +465,11 @@ class SymmetricHashJoinStateManager(
logInfo(s"Aborted store ${stateStore.id}")
stateStore.abort()
}
+ // If this class manages a state store provider by itself, it should
take care of closing
+ // provider instance as well.
+ if (stateStoreProvider != null) {
+ stateStoreProvider.close()
+ }
}
def metrics: StateStoreMetrics = stateStore.metrics
@@ -465,9 +478,16 @@ class SymmetricHashJoinStateManager(
protected def getStateStore(keySchema: StructType, valueSchema:
StructType): StateStore = {
val storeProviderId = StateStoreProviderId(
stateInfo.get, partitionId, getStateStoreName(joinSide,
stateStoreType))
- val store = StateStore.get(
- storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
- stateInfo.get.storeVersion, storeConf, hadoopConf)
+ val store = if (useStateStoreCoordinator) {
+ StateStore.get(
+ storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
+ stateInfo.get.storeVersion, storeConf, hadoopConf)
+ } else {
+ // This class will manage the state store provider by itself.
+ stateStoreProvider = StateStoreProvider.createAndInit(
+ storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
storeConf, hadoopConf)
+ stateStoreProvider.getStore(stateInfo.get.storeVersion)
+ }
logInfo(s"Loaded store ${store.id}")
store
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]