This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c624f269b [SPARK-45084][SS] StateOperatorProgress to use accurate 
effective shuffle partition number
25c624f269b is described below

commit 25c624f269bfec027ad889c1764d2904f19a2506
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Fri Sep 15 10:36:44 2023 +0900

    [SPARK-45084][SS] StateOperatorProgress to use accurate effective shuffle 
partition number
    
    ### What changes were proposed in this pull request?
    Make StateOperatorProgress.numShufflePartitions to use the effective number 
of shuffle partitions is reported.
    This metric StateStoreWriter.numShufflePartitions is dropped at the same 
time, as it is not a metric anymore.
    
    ### Why are the changes needed?
    Currently, there is a numShufflePartitions "metric" reported in
    StateOperatorProgress part of the progress report. However, the number is 
reported by aggregating executors so in the case of task retry or speculative 
executor, the metric is higher than number of shuffle partitions for the query 
plan. We change the metric to use the value to use to make it more usable.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    StreamingAggregationSuite contains a unit test that validates the value
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #42822 from siying/numShufflePartitionsMetric.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/execution/streaming/statefulOperators.scala    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index b31f6151fce..67d89c7f40f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -143,7 +143,6 @@ trait StateStoreWriter extends StatefulOperator with 
PythonSQLMetrics { self: Sp
     "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to remove"),
     "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
commit changes"),
     "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by 
state"),
-    "numShufflePartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
shuffle partitions"),
     "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
       "number of state store instances")
   ) ++ stateStoreCustomMetrics ++ pythonMetrics
@@ -159,6 +158,8 @@ trait StateStoreWriter extends StatefulOperator with 
PythonSQLMetrics { self: Sp
     val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
       new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava)
 
+    // We now don't report number of shuffle partitions inside the state 
operator. Instead,
+    // it will be filled when the stream query progress is reported
     new StateOperatorProgress(
       operatorName = shortName,
       numRowsTotal = longMetric("numTotalStateRows").value,
@@ -169,7 +170,7 @@ trait StateStoreWriter extends StatefulOperator with 
PythonSQLMetrics { self: Sp
       commitTimeMs = longMetric("commitTimeMs").value,
       memoryUsedBytes = longMetric("stateMemory").value,
       numRowsDroppedByWatermark = 
longMetric("numRowsDroppedByWatermark").value,
-      numShufflePartitions = longMetric("numShufflePartitions").value,
+      numShufflePartitions = 
stateInfo.map(_.numPartitions.toLong).getOrElse(-1L),
       numStateStoreInstances = longMetric("numStateStoreInstances").value,
       javaConvertedCustomMetrics
     )
@@ -183,7 +184,6 @@ trait StateStoreWriter extends StatefulOperator with 
PythonSQLMetrics { self: Sp
     assert(numStateStoreInstances >= 1, s"invalid number of stores: 
$numStateStoreInstances")
     // Shuffle partitions capture the number of tasks that have this stateful 
operator instance.
     // For each task instance this number is incremented by one.
-    longMetric("numShufflePartitions") += 1
     longMetric("numStateStoreInstances") += numStateStoreInstances
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to