This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new da3ccab  [SPARK-34270][SS] Combine StateStoreMetrics should not 
override StateStoreCustomMetric
da3ccab is described below

commit da3ccab9428f9a284d01d4d432e3c47f28dc6d6e
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 29 20:50:39 2021 -0800

    [SPARK-34270][SS] Combine StateStoreMetrics should not override 
StateStoreCustomMetric
    
    This patch proposes to sum up custom metric values instead of taking 
arbitrary one when combining `StateStoreMetrics`.
    
    For stateful join in structured streaming, we need to combine 
`StateStoreMetrics` from both left and right side. Currently we simply take 
arbitrary one from custom metrics with same name from left and right. By doing 
this we miss half of metric number.
    
    Yes, this corrects metrics collected for stateful join.
    
    Unit test.
    
    Closes #31369 from viirya/SPARK-34270.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 50d14c98c3828d8d9cc62ebc61ad4d20398ee6c6)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/execution/streaming/state/StateStore.scala | 10 +++++++++-
 .../streaming/state/StateStoreSuite.scala          | 22 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index d3313b8..3e27012 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -123,10 +123,18 @@ case class StateStoreMetrics(
 
 object StateStoreMetrics {
   def combine(allMetrics: Seq[StateStoreMetrics]): StateStoreMetrics = {
+    val distinctCustomMetrics = 
allMetrics.flatMap(_.customMetrics.keys).distinct
+    val customMetrics = allMetrics.flatMap(_.customMetrics)
+    val combinedCustomMetrics = distinctCustomMetrics.map { customMetric =>
+      val sameMetrics = customMetrics.filter(_._1 == customMetric)
+      val sumOfMetrics = sameMetrics.map(_._2).sum
+      customMetric -> sumOfMetrics
+    }.toMap
+
     StateStoreMetrics(
       allMetrics.map(_.numKeys).sum,
       allMetrics.map(_.memoryUsedBytes).sum,
-      allMetrics.flatMap(_.customMetrics).toMap)
+      combinedCustomMetrics)
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 5e97314..39828c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -968,6 +968,28 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
   }
 
+  test("SPARK-34270: StateStoreMetrics.combine should not override individual 
metrics") {
+    val customSumMetric = StateStoreCustomSumMetric("metric1", "custom metric 
1")
+    val customSizeMetric = StateStoreCustomSizeMetric("metric2", "custom 
metric 2")
+    val customTimingMetric = StateStoreCustomTimingMetric("metric3", "custom 
metric 3")
+
+    val leftCustomMetrics: Map[StateStoreCustomMetric, Long] =
+      Map(customSumMetric -> 10L, customSizeMetric -> 5L, customTimingMetric 
-> 100L)
+    val leftMetrics = StateStoreMetrics(1, 10, leftCustomMetrics)
+
+    val rightCustomMetrics: Map[StateStoreCustomMetric, Long] =
+      Map(customSumMetric -> 20L, customSizeMetric -> 15L, customTimingMetric 
-> 300L)
+    val rightMetrics = StateStoreMetrics(3, 20, rightCustomMetrics)
+
+    val combinedMetrics = StateStoreMetrics.combine(Seq(leftMetrics, 
rightMetrics))
+    assert(combinedMetrics.numKeys == 4)
+    assert(combinedMetrics.memoryUsedBytes == 30)
+    assert(combinedMetrics.customMetrics.size == 3)
+    assert(combinedMetrics.customMetrics(customSumMetric) == 30L)
+    assert(combinedMetrics.customMetrics(customSizeMetric) == 20L)
+    assert(combinedMetrics.customMetrics(customTimingMetric) == 400L)
+  }
+
   /** Return a new provider with a random id */
   def newStoreProvider(): ProviderClass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to