This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 3b5956c  [SPARK-34270][SS] Combine StateStoreMetrics should not 
override StateStoreCustomMetric
3b5956c is described below

commit 3b5956c9003d63ce2728b48421c950e32b19ba01
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 29 20:50:39 2021 -0800

    [SPARK-34270][SS] Combine StateStoreMetrics should not override 
StateStoreCustomMetric
    
    This patch proposes to sum up custom metric values instead of taking 
arbitrary one when combining `StateStoreMetrics`.
    
    For stateful join in structured streaming, we need to combine 
`StateStoreMetrics` from both left and right side. Currently we simply take 
arbitrary one from custom metrics with same name from left and right. By doing 
this we miss half of metric number.
    
    Yes, this corrects metrics collected for stateful join.
    
    Unit test.
    
    Closes #31369 from viirya/SPARK-34270.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 50d14c98c3828d8d9cc62ebc61ad4d20398ee6c6)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/execution/streaming/state/StateStore.scala | 10 +++++++++-
 .../streaming/state/StateStoreSuite.scala          | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index f87a2fb..7c69e6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -165,10 +165,18 @@ case class StateStoreMetrics(
 
 object StateStoreMetrics {
   def combine(allMetrics: Seq[StateStoreMetrics]): StateStoreMetrics = {
+    val distinctCustomMetrics = 
allMetrics.flatMap(_.customMetrics.keys).distinct
+    val customMetrics = allMetrics.flatMap(_.customMetrics)
+    val combinedCustomMetrics = distinctCustomMetrics.map { customMetric =>
+      val sameMetrics = customMetrics.filter(_._1 == customMetric)
+      val sumOfMetrics = sameMetrics.map(_._2).sum
+      customMetric -> sumOfMetrics
+    }.toMap
+
     StateStoreMetrics(
       allMetrics.map(_.numKeys).sum,
       allMetrics.map(_.memoryUsedBytes).sum,
-      allMetrics.flatMap(_.customMetrics).toMap)
+      combinedCustomMetrics)
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index b81e5b9..7e8f955 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -990,6 +990,28 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
   }
 
+  test("SPARK-34270: StateStoreMetrics.combine should not override individual 
metrics") {
+    val customSumMetric = StateStoreCustomSumMetric("metric1", "custom metric 
1")
+    val customSizeMetric = StateStoreCustomSizeMetric("metric2", "custom 
metric 2")
+    val customTimingMetric = StateStoreCustomTimingMetric("metric3", "custom 
metric 3")
+
+    val leftCustomMetrics: Map[StateStoreCustomMetric, Long] =
+      Map(customSumMetric -> 10L, customSizeMetric -> 5L, customTimingMetric 
-> 100L)
+    val leftMetrics = StateStoreMetrics(1, 10, leftCustomMetrics)
+
+    val rightCustomMetrics: Map[StateStoreCustomMetric, Long] =
+      Map(customSumMetric -> 20L, customSizeMetric -> 15L, customTimingMetric 
-> 300L)
+    val rightMetrics = StateStoreMetrics(3, 20, rightCustomMetrics)
+
+    val combinedMetrics = StateStoreMetrics.combine(Seq(leftMetrics, 
rightMetrics))
+    assert(combinedMetrics.numKeys == 4)
+    assert(combinedMetrics.memoryUsedBytes == 30)
+    assert(combinedMetrics.customMetrics.size == 3)
+    assert(combinedMetrics.customMetrics(customSumMetric) == 30L)
+    assert(combinedMetrics.customMetrics(customSizeMetric) == 20L)
+    assert(combinedMetrics.customMetrics(customTimingMetric) == 400L)
+  }
+
   /** Return a new provider with a random id */
   def newStoreProvider(): ProviderClass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to