Huanli Wang created SPARK-48105:
-----------------------------------
Summary: Fix the data corruption issue when state store unload
and snapshotting happens concurrently for HDFS state store
Key: SPARK-48105
URL: https://issues.apache.org/jira/browse/SPARK-48105
Project: Spark
Issue Type: Improvement
Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Huanli Wang
There are two race conditions between state store snapshotting and state store
unloading which could result in query failure and potential data corruption.
Case 1:
# the maintenance thread pool encounters some issues and call the
[stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
this function further calls
[threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
However, this function doesn't wait for the stop operation to be completed and
move to do the state store [unload and
clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
# the provider unload will [close the state
store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
which [clear the values of
loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
for HDFS backed state store.
# if the not-yet-stop maintenance thread is still running and trying to do the
snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been
removed. if this snapshot process completes successfully, then we will write
corrupted data and the following batches will consume this corrupted data.
Case 2:
# In executor_1, the maintenance thread is going to do the snapshot for
state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the
loadedMaps, after this, the maintenance thread [releases the lock of the
loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
# state_store_1 is loaded in another executor, e.g. executor_2.
# another state store, state_store_2, is loaded on executor_1 and
[reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
to driver.
# executor_1 does the
[unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
for those no longer active state store which clears the data entries in the
`HDFSBackedStateStoreMap`
# the snapshotting thread is terminated and uploads the incomplete snapshot to
cloud because the [iterator doesn't have next
element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
after doing the clear.
# future batches are consuming the corrupted data.
Proposed fix:
* When we close the hdfs state store, we should only remove the entry from
`loadedMaps` rather than doing the active data cleanup. JVM GC should be able
to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the
providers.
Thanks [~anishshri-db] for helping debug this issue!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]