This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 72f56f73840 [SPARK-42566][SS] RocksDB StateStore lock acquisition should happen after getting input iterator from inputRDD 72f56f73840 is described below commit 72f56f738408c5471309318e300014560bec285e Author: Huanli Wang <huanli.w...@databricks.com> AuthorDate: Sat Feb 25 18:38:31 2023 +0900 [SPARK-42566][SS] RocksDB StateStore lock acquisition should happen after getting input iterator from inputRDD The current behavior of the `compute` method in both `StateStoreRDD` and `ReadStateStoreRDD` is: we first get the state store instance and then get the input iterator for the inputRDD. For RocksDB state store, the running task will acquire and hold the lock for this instance. The retried task or speculative task will fail to acquire the lock and eventually abort the job if there are some network issues. For example, When we shrink the executors, the alive one will try to fetch data from the killed ones because it doesn't know the target location (prefetched from the driver) is dead until it tries to fetch data. The query might be hanging for a long time as the execu [...] Making lock acquisition happen after retrieving the input iterator should be able to avoid this situation. ### What changes were proposed in this pull request? Making lock acquisition happen after retrieving the input iterator. ### Why are the changes needed? Avoid the failure like the following when there is a network issue ``` java.lang.IllegalStateException: StateStoreId(opId=0,partId=3,name=default): RocksDB instance could not be acquired by [ThreadId: Some(47), task: 3.1 in stage 57, TID 793] as it was not released by [ThreadId: Some(51), task: 3.1 in stage 57, TID 342] after 60003 ms. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing UT should be good enough Closes #40162 from huanliwang-db/rocksdb-2. Authored-by: Huanli Wang <huanli.w...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/execution/streaming/state/StateStoreRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index fbe83ad92bb..c62403f1dc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -86,10 +86,10 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag]( override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { val storeProviderId = getStateProviderId(partition) + val inputIter = dataRDD.iterator(partition, ctxt) val store = StateStore.getReadOnly( storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion, storeConf, hadoopConfBroadcast.value.value) - val inputIter = dataRDD.iterator(partition, ctxt) storeReadFunction(store, inputIter) } } @@ -120,10 +120,10 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { val storeProviderId = getStateProviderId(partition) + val inputIter = dataRDD.iterator(partition, ctxt) val store = StateStore.get( storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion, storeConf, hadoopConfBroadcast.value.value) - val inputIter = dataRDD.iterator(partition, ctxt) storeUpdateFunction(store, inputIter) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org