This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 988439d [SPARK-37987][SS] Fix flaky test
StreamingAggregationSuite.changing schema of state when restarting query
988439d is described below
commit 988439d7287482c465f7da6c8e9c14303488158f
Author: Jungtaek Lim <[email protected]>
AuthorDate: Mon Jan 24 17:33:24 2022 +0900
[SPARK-37987][SS] Fix flaky test StreamingAggregationSuite.changing schema
of state when restarting query
### What changes were proposed in this pull request?
This PR fixes a flaky test `StreamingAggregationSuite.changing schema of
state when restarting query`, via adjusting the number of shuffle partition to
1.
The flakiness was due to the optimization on schema verification - we only
verify it in partition 0 since it is costly and redundant to verify the schema
for all partitions. Other partitions are still possible to provide other errors
which are considered as unexpected.
### Why are the changes needed?
This PR fixes a flaky test.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Ran test suite 10 times locally.
Closes #35298 from HeartSaVioR/SPARK-37987.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 3b540ad822a53a8cb94159dc8aa3c66d34085e3e)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/streaming/StreamingAggregationSuite.scala | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 77334ad..8a7bb8b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -766,7 +766,11 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
}
testQuietlyWithAllStateVersions("changing schema of state when restarting
query",
- (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false")) {
+ (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false"),
+ // Since we only do the check in partition 0 and other partitions still
may fail with
+ // different errors, we change the number of shuffle partitions to 1 to
make the test
+ // result to be deterministic.
+ (SQLConf.SHUFFLE_PARTITIONS.key, "1")) {
withTempDir { tempDir =>
val (inputData, aggregated) =
prepareTestForChangingSchemaOfState(tempDir)
@@ -790,7 +794,11 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
testQuietlyWithAllStateVersions("changing schema of state when restarting
query -" +
" schema check off",
(SQLConf.STATE_SCHEMA_CHECK_ENABLED.key, "false"),
- (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false")) {
+ (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false"),
+ // Since we only do the check in partition 0 and other partitions still
may fail with
+ // different errors, we change the number of shuffle partitions to 1 to
make the test
+ // result to be deterministic.
+ (SQLConf.SHUFFLE_PARTITIONS.key, "1")) {
withTempDir { tempDir =>
val (inputData, aggregated) =
prepareTestForChangingSchemaOfState(tempDir)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]