This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new aff9eab9039 [SPARK-45511][SS] Fix state reader suite flakiness by
clean up resources after each test run
aff9eab9039 is described below
commit aff9eab90392f22c0037abdf50e6894615e4dbf9
Author: Chaoqin Li <[email protected]>
AuthorDate: Fri Nov 17 07:27:28 2023 +0900
[SPARK-45511][SS] Fix state reader suite flakiness by clean up resources
after each test run
### What changes were proposed in this pull request?
Fix state reader suite flakiness by clean up resources after each test.
The reason we have to clean up StateStore per test is due to maintenance
task. When we run the streaming query, state store is being initialized in to
the executor, and registration is performed against the coordinator in driver.
The lifecycle of the state store provider is not strictly tied to the the
lifecycle of the streaming query - the executor closes the state store provider
when coordinator indicates to the executor that the state store provider is no
longer valid, which is not [...]
This means maintenance task against the provider can run after test A. We
are clearing the temp directory in test A after the test A has completed, which
can break the operation being performed against state store provider being used
in test A. E.g. directory no longer exists while maintenance task is running.
This won't be an issue in practice because we do not expect the checkpoint
location to be temporary, but it is indeed an issue for how we setup and
cleanup env for tests.
### Why are the changes needed?
To deflake the test.
Closes #43831 from chaoqin-li1123/fix_state_reader_suite.
Authored-by: Chaoqin Li <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../datasources/v2/state/StateDataSourceTestBase.scala | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
index 890a716bbef..f5392cc823f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
@@ -20,6 +20,7 @@ import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
@@ -28,6 +29,17 @@ import org.apache.spark.sql.streaming.util.StreamManualClock
trait StateDataSourceTestBase extends StreamTest with StateStoreMetricsTest {
import testImplicits._
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ }
+
+ override def afterEach(): Unit = {
+ // Stop maintenance tasks because they may access already deleted
checkpoint.
+ StateStore.stop()
+ super.afterEach()
+ }
+
protected def runCompositeKeyStreamingAggregationQuery(checkpointRoot:
String): Unit = {
val inputData = MemoryStream[Int]
val aggregated = getCompositeKeyStreamingAggregationQuery(inputData)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]