This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 4caa43e  [SPARK-36041][SS][DOCS] Introduce the 
RocksDBStateStoreProvider in the programming guide
4caa43e is described below

commit 4caa43e3989998506076c24d3b7020d986249e9f
Author: Yuanjian Li <[email protected]>
AuthorDate: Mon Aug 16 12:32:08 2021 -0700

    [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the 
programming guide
    
    ### What changes were proposed in this pull request?
    Add the document for the new RocksDBStateStoreProvider.
    
    ### Why are the changes needed?
    User guide for the new feature.
    
    ### Does this PR introduce _any_ user-facing change?
    No, doc only.
    
    ### How was this patch tested?
    Doc only.
    
    Closes #33683 from xuanyuanking/SPARK-36041.
    
    Authored-by: Yuanjian Li <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
    (cherry picked from commit 3d57e00a7f8d8f2f7dc0bfbfb0466ef38fb3da08)
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 docs/structured-streaming-programming-guide.md | 75 +++++++++++++++++++++++++-
 1 file changed, 74 insertions(+), 1 deletion(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index b56d8c8..3f89111 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1870,7 +1870,80 @@ hence the number is not same as the number of original 
input rows. You'd like to
 There's a known workaround: split your streaming query into multiple queries 
per stateful operator, and ensure
 end-to-end exactly once per query. Ensuring end-to-end exactly once for the 
last query is optional.
 
-### State Store and task locality
+### State Store
+
+State store is a versioned key-value store which provides both read and write 
operations. In
+Structured Streaming, we use the state store provider to handle the stateful 
operations across
+batches. There are two built-in state store provider implementations. End 
users can also implement
+their own state store provider by extending StateStoreProvider interface.
+
+#### HDFS state store provider
+
+The HDFS backend state store provider is the default implementation of 
[[StateStoreProvider]] and
+[[StateStore]] in which all the data is stored in memory map in the first 
stage, and then backed
+by files in an HDFS-compatible file system. All updates to the store have to 
be done in sets
+transactionally, and each set of updates increments the store's version. These 
versions can be
+used to re-execute the updates (by retries in RDD operations) on the correct 
version of the store,
+and regenerate the store version.
+
+#### RocksDB state store implementation
+
+As of Spark 3.2, we add a new built-in state store implementation, RocksDB 
state store provider.
+
+If you have stateful operations in your streaming query (for example, 
streaming aggregation,
+streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or 
flatMapGroupsWithState)
+and you want to maintain millions of keys in the state, then you may face 
issues related to large
+JVM garbage collection (GC) pauses causing high variations in the micro-batch 
processing times.
+This occurs because, by the implementation of HDFSBackedStateStore, the state 
data is maintained
+in the JVM memory of the executors and large number of state objects puts 
memory pressure on the
+JVM causing high GC pauses.
+
+In such cases, you can choose to use a more optimized state management 
solution based on
+[RocksDB](https://rocksdb.org/). Rather than keeping the state in the JVM 
memory, this solution
+uses RocksDB to efficiently manage the state in the native memory and the 
local disk. Furthermore,
+any changes to this state are automatically saved by Structured Streaming to 
the checkpoint
+location you have provided, thus providing full fault-tolerance guarantees 
(the same as default
+state management).
+
+To enable the new build-in state store implementation, set 
`spark.sql.streaming.stateStore.providerClass`
+to `org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider`.
+
+Here are the configs regarding to RocksDB instance of the state store provider:
+
+<table class="table">
+  <tr>
+    <th>Config Name</th>
+    <th>Description</th>
+    <th>Default Value</th>
+  </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.compactOnCommit</td>
+    <td>Whether we perform a range compaction of RocksDB instance for commit 
operation</td>
+    <td>False</td>
+  </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
+    <td>Approximate size in KB of user data packed per block for a RocksDB 
BlockBasedTable, which is a RocksDB's default SST file format.</td>
+    <td>4</td>
+  </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB</td>
+    <td>The size capacity in MB for a cache of blocks.</td>
+    <td>8</td>
+  </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs</td>
+    <td>The waiting time in millisecond for acquiring lock in the load 
operation for RocksDB instance.</td>
+    <td>60000</td>
+  </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad</td>
+    <td>Whether we resets all ticker and histogram stats for RocksDB on 
load.</td>
+    <td>True</td>
+  </tr>
+</table>
+
+#### State Store and task locality
 
 The stateful operations store states for events in state stores of executors. 
State stores occupy resources such as memory and disk space to store the states.
 So it is more efficient to keep a state store provider running in the same 
executor across different streaming batches.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to