This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 4caa43e [SPARK-36041][SS][DOCS] Introduce the
RocksDBStateStoreProvider in the programming guide
4caa43e is described below
commit 4caa43e3989998506076c24d3b7020d986249e9f
Author: Yuanjian Li <[email protected]>
AuthorDate: Mon Aug 16 12:32:08 2021 -0700
[SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the
programming guide
### What changes were proposed in this pull request?
Add the document for the new RocksDBStateStoreProvider.
### Why are the changes needed?
User guide for the new feature.
### Does this PR introduce _any_ user-facing change?
No, doc only.
### How was this patch tested?
Doc only.
Closes #33683 from xuanyuanking/SPARK-36041.
Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 3d57e00a7f8d8f2f7dc0bfbfb0466ef38fb3da08)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
docs/structured-streaming-programming-guide.md | 75 +++++++++++++++++++++++++-
1 file changed, 74 insertions(+), 1 deletion(-)
diff --git a/docs/structured-streaming-programming-guide.md
b/docs/structured-streaming-programming-guide.md
index b56d8c8..3f89111 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1870,7 +1870,80 @@ hence the number is not same as the number of original
input rows. You'd like to
There's a known workaround: split your streaming query into multiple queries
per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the
last query is optional.
-### State Store and task locality
+### State Store
+
+State store is a versioned key-value store which provides both read and write
operations. In
+Structured Streaming, we use the state store provider to handle the stateful
operations across
+batches. There are two built-in state store provider implementations. End
users can also implement
+their own state store provider by extending StateStoreProvider interface.
+
+#### HDFS state store provider
+
+The HDFS backend state store provider is the default implementation of
[[StateStoreProvider]] and
+[[StateStore]] in which all the data is stored in memory map in the first
stage, and then backed
+by files in an HDFS-compatible file system. All updates to the store have to
be done in sets
+transactionally, and each set of updates increments the store's version. These
versions can be
+used to re-execute the updates (by retries in RDD operations) on the correct
version of the store,
+and regenerate the store version.
+
+#### RocksDB state store implementation
+
+As of Spark 3.2, we add a new built-in state store implementation, RocksDB
state store provider.
+
+If you have stateful operations in your streaming query (for example,
streaming aggregation,
+streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or
flatMapGroupsWithState)
+and you want to maintain millions of keys in the state, then you may face
issues related to large
+JVM garbage collection (GC) pauses causing high variations in the micro-batch
processing times.
+This occurs because, by the implementation of HDFSBackedStateStore, the state
data is maintained
+in the JVM memory of the executors and large number of state objects puts
memory pressure on the
+JVM causing high GC pauses.
+
+In such cases, you can choose to use a more optimized state management
solution based on
+[RocksDB](https://rocksdb.org/). Rather than keeping the state in the JVM
memory, this solution
+uses RocksDB to efficiently manage the state in the native memory and the
local disk. Furthermore,
+any changes to this state are automatically saved by Structured Streaming to
the checkpoint
+location you have provided, thus providing full fault-tolerance guarantees
(the same as default
+state management).
+
+To enable the new build-in state store implementation, set
`spark.sql.streaming.stateStore.providerClass`
+to `org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider`.
+
+Here are the configs regarding to RocksDB instance of the state store provider:
+
+<table class="table">
+ <tr>
+ <th>Config Name</th>
+ <th>Description</th>
+ <th>Default Value</th>
+ </tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.compactOnCommit</td>
+ <td>Whether we perform a range compaction of RocksDB instance for commit
operation</td>
+ <td>False</td>
+ </tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
+ <td>Approximate size in KB of user data packed per block for a RocksDB
BlockBasedTable, which is a RocksDB's default SST file format.</td>
+ <td>4</td>
+ </tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB</td>
+ <td>The size capacity in MB for a cache of blocks.</td>
+ <td>8</td>
+ </tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs</td>
+ <td>The waiting time in millisecond for acquiring lock in the load
operation for RocksDB instance.</td>
+ <td>60000</td>
+ </tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad</td>
+ <td>Whether we resets all ticker and histogram stats for RocksDB on
load.</td>
+ <td>True</td>
+ </tr>
+</table>
+
+#### State Store and task locality
The stateful operations store states for events in state stores of executors.
State stores occupy resources such as memory and disk space to store the states.
So it is more efficient to keep a state store provider running in the same
executor across different streaming batches.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]