This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 72f56f73840 [SPARK-42566][SS] RocksDB StateStore lock acquisition 
should happen after getting input iterator from inputRDD
72f56f73840 is described below

commit 72f56f738408c5471309318e300014560bec285e
Author: Huanli Wang <huanli.w...@databricks.com>
AuthorDate: Sat Feb 25 18:38:31 2023 +0900

    [SPARK-42566][SS] RocksDB StateStore lock acquisition should happen after 
getting input iterator from inputRDD
    
    The current behavior of the `compute` method in both `StateStoreRDD` and 
`ReadStateStoreRDD` is: we first get the state store instance and then get the 
input iterator for the inputRDD.
    
    For RocksDB state store, the running task will acquire and hold the lock 
for this instance. The retried task or speculative task will fail to acquire 
the lock and eventually abort the job if there are some network issues. For 
example, When we shrink the executors, the alive one will try to fetch data 
from the killed ones because it doesn't know the target location (prefetched 
from the driver) is dead until it tries to fetch data. The query might be 
hanging for a long time as the execu [...]
    
    Making lock acquisition happen after retrieving the input iterator should 
be able to avoid this situation.
    ### What changes were proposed in this pull request?
    
    Making lock acquisition happen after retrieving the input iterator.
    
    ### Why are the changes needed?
    
    Avoid the failure like the following when there is a network issue
    ```
    java.lang.IllegalStateException: 
StateStoreId(opId=0,partId=3,name=default): RocksDB instance could not be 
acquired by
    [ThreadId: Some(47), task: 3.1 in stage 57, TID 793] as it was not released 
by [ThreadId: Some(51), task: 3.1 in stage 57,
    TID 342] after 60003 ms.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing UT should be good enough
    
    Closes #40162 from huanliwang-db/rocksdb-2.
    
    Authored-by: Huanli Wang <huanli.w...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/execution/streaming/state/StateStoreRDD.scala    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index fbe83ad92bb..c62403f1dc7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -86,10 +86,10 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag](
   override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = 
{
     val storeProviderId = getStateProviderId(partition)
 
+    val inputIter = dataRDD.iterator(partition, ctxt)
     val store = StateStore.getReadOnly(
       storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion,
       storeConf, hadoopConfBroadcast.value.value)
-    val inputIter = dataRDD.iterator(partition, ctxt)
     storeReadFunction(store, inputIter)
   }
 }
@@ -120,10 +120,10 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
   override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = 
{
     val storeProviderId = getStateProviderId(partition)
 
+    val inputIter = dataRDD.iterator(partition, ctxt)
     val store = StateStore.get(
       storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeVersion,
       storeConf, hadoopConfBroadcast.value.value)
-    val inputIter = dataRDD.iterator(partition, ctxt)
     storeUpdateFunction(store, inputIter)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to