This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6718723  [SPARK-34270][SS] Combine StateStoreMetrics should not 
override StateStoreCustomMetric
6718723 is described below

commit 671872390486dfdde6f0dd17f4757058774bb429
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 29 20:50:39 2021 -0800

    [SPARK-34270][SS] Combine StateStoreMetrics should not override 
StateStoreCustomMetric
    
    This patch proposes to sum up custom metric values instead of taking 
arbitrary one when combining `StateStoreMetrics`.
    
    For stateful join in structured streaming, we need to combine 
`StateStoreMetrics` from both left and right side. Currently we simply take 
arbitrary one from custom metrics with same name from left and right. By doing 
this we miss half of metric number.
    
    Yes, this corrects metrics collected for stateful join.
    
    Unit test.
    
    Closes #31369 from viirya/SPARK-34270.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 50d14c98c3828d8d9cc62ebc61ad4d20398ee6c6)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/execution/streaming/state/StateStore.scala | 10 +++++++++-
 .../streaming/state/StateStoreSuite.scala          | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 7d80fd0..65f0d18 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -123,10 +123,18 @@ case class StateStoreMetrics(
 
 object StateStoreMetrics {
   def combine(allMetrics: Seq[StateStoreMetrics]): StateStoreMetrics = {
+    val distinctCustomMetrics = 
allMetrics.flatMap(_.customMetrics.keys).distinct
+    val customMetrics = allMetrics.flatMap(_.customMetrics)
+    val combinedCustomMetrics = distinctCustomMetrics.map { customMetric =>
+      val sameMetrics = customMetrics.filter(_._1 == customMetric)
+      val sumOfMetrics = sameMetrics.map(_._2).sum
+      customMetric -> sumOfMetrics
+    }.toMap
+
     StateStoreMetrics(
       allMetrics.map(_.numKeys).sum,
       allMetrics.map(_.memoryUsedBytes).sum,
-      allMetrics.flatMap(_.customMetrics).toMap)
+      combinedCustomMetrics)
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 4888799..6a4982c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -970,6 +970,28 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
   }
 
+  test("SPARK-34270: StateStoreMetrics.combine should not override individual 
metrics") {
+    val customSumMetric = StateStoreCustomSumMetric("metric1", "custom metric 
1")
+    val customSizeMetric = StateStoreCustomSizeMetric("metric2", "custom 
metric 2")
+    val customTimingMetric = StateStoreCustomTimingMetric("metric3", "custom 
metric 3")
+
+    val leftCustomMetrics: Map[StateStoreCustomMetric, Long] =
+      Map(customSumMetric -> 10L, customSizeMetric -> 5L, customTimingMetric 
-> 100L)
+    val leftMetrics = StateStoreMetrics(1, 10, leftCustomMetrics)
+
+    val rightCustomMetrics: Map[StateStoreCustomMetric, Long] =
+      Map(customSumMetric -> 20L, customSizeMetric -> 15L, customTimingMetric 
-> 300L)
+    val rightMetrics = StateStoreMetrics(3, 20, rightCustomMetrics)
+
+    val combinedMetrics = StateStoreMetrics.combine(Seq(leftMetrics, 
rightMetrics))
+    assert(combinedMetrics.numKeys == 4)
+    assert(combinedMetrics.memoryUsedBytes == 30)
+    assert(combinedMetrics.customMetrics.size == 3)
+    assert(combinedMetrics.customMetrics(customSumMetric) == 30L)
+    assert(combinedMetrics.customMetrics(customSizeMetric) == 20L)
+    assert(combinedMetrics.customMetrics(customTimingMetric) == 400L)
+  }
+
   /** Return a new provider with a random id */
   def newStoreProvider(): ProviderClass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to