This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d642b6bd174a [SPARK-51252][SS][TESTS] Make tests more consistent for 
StateStoreInstanceMetricSuite
d642b6bd174a is described below

commit d642b6bd174a6985c4c6ecd0030fbe60ad74169e
Author: Zeyu Chen <zyc...@gmail.com>
AuthorDate: Tue Apr 1 08:52:51 2025 +0900

    [SPARK-51252][SS][TESTS] Make tests more consistent for 
StateStoreInstanceMetricSuite
    
    ### What changes were proposed in this pull request?
    
    SPARK-51252
    
    Fix for flaky tests described in #50030.
    
    This PR relaxes the version check in the tests and processes more data in 
queries to force more batches.
    
    I also reduced the number of shuffle partitions for one of the join queries 
because there used to be 5 * 4 state stores that would need a snapshot to 
upload in a short time span. Now there's only 3 * 4 stores to deal with.
    
    ### Why are the changes needed?
    
    The tests previously introduced in #50030 are quite flaky depending on the 
environment, resulting in CI gates failing some tests in the suite.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Re-ran and verified the tests multiple times locally, but primarily using 
the current merge gates to see if these pass properly.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50405 from zecookiez/SPARK-51252-hdfs-flaky-fix.
    
    Authored-by: Zeyu Chen <zyc...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../state/StateStoreInstanceMetricSuite.scala      | 35 +++++++++++++++-------
 1 file changed, 25 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
index e7f0276d676f..b6de3a5f3351 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala
@@ -88,6 +88,8 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
               ProcessAllAvailable(),
               AddData(inputData, "b"),
               ProcessAllAvailable(),
+              AddData(inputData, "b"),
+              ProcessAllAvailable(),
               CheckNewAnswer("a", "b"),
               Execute { q =>
                 // Make sure only smallest K active metrics are published
@@ -104,7 +106,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
                       .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
                   )
                   // All state store instances should have uploaded a version
-                  assert(instanceMetrics.forall(_._2 == 2))
+                  assert(instanceMetrics.forall(_._2 >= 0))
                 }
               },
               StopStream
@@ -150,6 +152,8 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
               ProcessAllAvailable(),
               AddData(inputData, "b"),
               ProcessAllAvailable(),
+              AddData(inputData, "b"),
+              ProcessAllAvailable(),
               CheckNewAnswer("a", "b"),
               Execute { q =>
                 // Partitions getting skipped (id 0 and 1) do not have an 
uploaded version, leaving
@@ -179,10 +183,10 @@ class StateStoreInstanceMetricSuite extends StreamTest 
with AlsoTestWithRocksDBF
                     instanceMetrics.size == q.sparkSession.conf
                       .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
                   )
-                  // Two metrics published are -1, the remainder should all be 
set to version 2
-                  // as they uploaded properly.
+                  // Two metrics published are -1, but the remainder should 
all be set to a
+                  // non-negative version as they uploaded properly.
                   assert(
-                    instanceMetrics.count(_._2 == 2) == q.sparkSession.conf
+                    instanceMetrics.count(_._2 >= 0) == q.sparkSession.conf
                       .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) 
- 2
                   )
                 }
@@ -209,7 +213,8 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
           SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
           SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
           SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
-          SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
+          SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10",
+          SQLConf.SHUFFLE_PARTITIONS.key -> "3"
         ) {
           withTempDir { checkpointDir =>
             val input1 = MemoryStream[Int]
@@ -229,11 +234,16 @@ class StateStoreInstanceMetricSuite extends StreamTest 
with AlsoTestWithRocksDBF
               ProcessAllAvailable(),
               AddData(input1, 2, 3),
               ProcessAllAvailable(),
-              CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)),
+              AddData(input1, 2),
+              ProcessAllAvailable(),
+              AddData(input2, 3),
+              ProcessAllAvailable(),
+              AddData(input1, 4),
+              ProcessAllAvailable(),
               Execute { q =>
                 eventually(timeout(10.seconds)) {
                   // Make sure only smallest K active metrics are published.
-                  // There are 5 * 4 = 20 metrics in total because of join, 
but only 10
+                  // There are 3 * 4 = 12 metrics in total because of join, 
but only 10
                   // are published.
                   val instanceMetrics = q.lastProgress
                     .stateOperators(0)
@@ -247,7 +257,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
                       .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
                   )
                   // All state store instances should have uploaded a version
-                  assert(instanceMetrics.forall(_._2 == 2))
+                  assert(instanceMetrics.forall(_._2 >= 0))
                 }
               },
               StopStream
@@ -300,7 +310,12 @@ class StateStoreInstanceMetricSuite extends StreamTest 
with AlsoTestWithRocksDBF
               ProcessAllAvailable(),
               AddData(input1, 2, 3),
               ProcessAllAvailable(),
-              CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)),
+              AddData(input1, 2),
+              ProcessAllAvailable(),
+              AddData(input2, 3),
+              ProcessAllAvailable(),
+              AddData(input1, 4),
+              ProcessAllAvailable(),
               Execute { q =>
                 eventually(timeout(10.seconds)) {
                   // Make sure only smallest K active metrics are published.
@@ -326,7 +341,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with 
AlsoTestWithRocksDBF
                   assert(badInstanceMetrics.count(_._2 == -1) == 2 * 4)
                   // The rest should have uploaded a version
                   assert(
-                    allInstanceMetrics.count(_._2 == 2) == q.sparkSession.conf
+                    allInstanceMetrics.count(_._2 >= 0) == q.sparkSession.conf
                       .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) 
- 2 * 4
                   )
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to