This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d642b6bd174a [SPARK-51252][SS][TESTS] Make tests more consistent for StateStoreInstanceMetricSuite d642b6bd174a is described below commit d642b6bd174a6985c4c6ecd0030fbe60ad74169e Author: Zeyu Chen <zyc...@gmail.com> AuthorDate: Tue Apr 1 08:52:51 2025 +0900 [SPARK-51252][SS][TESTS] Make tests more consistent for StateStoreInstanceMetricSuite ### What changes were proposed in this pull request? SPARK-51252 Fix for flaky tests described in #50030. This PR relaxes the version check in the tests and processes more data in queries to force more batches. I also reduced the number of shuffle partitions for one of the join queries because there used to be 5 * 4 state stores that would need a snapshot to upload in a short time span. Now there's only 3 * 4 stores to deal with. ### Why are the changes needed? The tests previously introduced in #50030 are quite flaky depending on the environment, resulting in CI gates failing some tests in the suite. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Re-ran and verified the tests multiple times locally, but primarily using the current merge gates to see if these pass properly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50405 from zecookiez/SPARK-51252-hdfs-flaky-fix. Authored-by: Zeyu Chen <zyc...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../state/StateStoreInstanceMetricSuite.scala | 35 +++++++++++++++------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala index e7f0276d676f..b6de3a5f3351 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala @@ -88,6 +88,8 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF ProcessAllAvailable(), AddData(inputData, "b"), ProcessAllAvailable(), + AddData(inputData, "b"), + ProcessAllAvailable(), CheckNewAnswer("a", "b"), Execute { q => // Make sure only smallest K active metrics are published @@ -104,7 +106,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) ) // All state store instances should have uploaded a version - assert(instanceMetrics.forall(_._2 == 2)) + assert(instanceMetrics.forall(_._2 >= 0)) } }, StopStream @@ -150,6 +152,8 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF ProcessAllAvailable(), AddData(inputData, "b"), ProcessAllAvailable(), + AddData(inputData, "b"), + ProcessAllAvailable(), CheckNewAnswer("a", "b"), Execute { q => // Partitions getting skipped (id 0 and 1) do not have an uploaded version, leaving @@ -179,10 +183,10 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF instanceMetrics.size == q.sparkSession.conf .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) ) - // Two metrics published are -1, the remainder should all be set to version 2 - // as they uploaded properly. + // Two metrics published are -1, but the remainder should all be set to a + // non-negative version as they uploaded properly. assert( - instanceMetrics.count(_._2 == 2) == q.sparkSession.conf + instanceMetrics.count(_._2 >= 0) == q.sparkSession.conf .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 ) } @@ -209,7 +213,8 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" + SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "3" ) { withTempDir { checkpointDir => val input1 = MemoryStream[Int] @@ -229,11 +234,16 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF ProcessAllAvailable(), AddData(input1, 2, 3), ProcessAllAvailable(), - CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), + AddData(input1, 2), + ProcessAllAvailable(), + AddData(input2, 3), + ProcessAllAvailable(), + AddData(input1, 4), + ProcessAllAvailable(), Execute { q => eventually(timeout(10.seconds)) { // Make sure only smallest K active metrics are published. - // There are 5 * 4 = 20 metrics in total because of join, but only 10 + // There are 3 * 4 = 12 metrics in total because of join, but only 10 // are published. val instanceMetrics = q.lastProgress .stateOperators(0) @@ -247,7 +257,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) ) // All state store instances should have uploaded a version - assert(instanceMetrics.forall(_._2 == 2)) + assert(instanceMetrics.forall(_._2 >= 0)) } }, StopStream @@ -300,7 +310,12 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF ProcessAllAvailable(), AddData(input1, 2, 3), ProcessAllAvailable(), - CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), + AddData(input1, 2), + ProcessAllAvailable(), + AddData(input2, 3), + ProcessAllAvailable(), + AddData(input1, 4), + ProcessAllAvailable(), Execute { q => eventually(timeout(10.seconds)) { // Make sure only smallest K active metrics are published. @@ -326,7 +341,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF assert(badInstanceMetrics.count(_._2 == -1) == 2 * 4) // The rest should have uploaded a version assert( - allInstanceMetrics.count(_._2 == 2) == q.sparkSession.conf + allInstanceMetrics.count(_._2 >= 0) == q.sparkSession.conf .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 * 4 ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org