This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6718723 [SPARK-34270][SS] Combine StateStoreMetrics should not
override StateStoreCustomMetric
6718723 is described below
commit 671872390486dfdde6f0dd17f4757058774bb429
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 29 20:50:39 2021 -0800
[SPARK-34270][SS] Combine StateStoreMetrics should not override
StateStoreCustomMetric
This patch proposes to sum up custom metric values instead of taking
arbitrary one when combining `StateStoreMetrics`.
For stateful join in structured streaming, we need to combine
`StateStoreMetrics` from both left and right side. Currently we simply take
arbitrary one from custom metrics with same name from left and right. By doing
this we miss half of metric number.
Yes, this corrects metrics collected for stateful join.
Unit test.
Closes #31369 from viirya/SPARK-34270.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 50d14c98c3828d8d9cc62ebc61ad4d20398ee6c6)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/streaming/state/StateStore.scala | 10 +++++++++-
.../streaming/state/StateStoreSuite.scala | 22 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 7d80fd0..65f0d18 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -123,10 +123,18 @@ case class StateStoreMetrics(
object StateStoreMetrics {
def combine(allMetrics: Seq[StateStoreMetrics]): StateStoreMetrics = {
+ val distinctCustomMetrics =
allMetrics.flatMap(_.customMetrics.keys).distinct
+ val customMetrics = allMetrics.flatMap(_.customMetrics)
+ val combinedCustomMetrics = distinctCustomMetrics.map { customMetric =>
+ val sameMetrics = customMetrics.filter(_._1 == customMetric)
+ val sumOfMetrics = sameMetrics.map(_._2).sum
+ customMetric -> sumOfMetrics
+ }.toMap
+
StateStoreMetrics(
allMetrics.map(_.numKeys).sum,
allMetrics.map(_.memoryUsedBytes).sum,
- allMetrics.flatMap(_.customMetrics).toMap)
+ combinedCustomMetrics)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 4888799..6a4982c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -970,6 +970,28 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
}
+ test("SPARK-34270: StateStoreMetrics.combine should not override individual
metrics") {
+ val customSumMetric = StateStoreCustomSumMetric("metric1", "custom metric
1")
+ val customSizeMetric = StateStoreCustomSizeMetric("metric2", "custom
metric 2")
+ val customTimingMetric = StateStoreCustomTimingMetric("metric3", "custom
metric 3")
+
+ val leftCustomMetrics: Map[StateStoreCustomMetric, Long] =
+ Map(customSumMetric -> 10L, customSizeMetric -> 5L, customTimingMetric
-> 100L)
+ val leftMetrics = StateStoreMetrics(1, 10, leftCustomMetrics)
+
+ val rightCustomMetrics: Map[StateStoreCustomMetric, Long] =
+ Map(customSumMetric -> 20L, customSizeMetric -> 15L, customTimingMetric
-> 300L)
+ val rightMetrics = StateStoreMetrics(3, 20, rightCustomMetrics)
+
+ val combinedMetrics = StateStoreMetrics.combine(Seq(leftMetrics,
rightMetrics))
+ assert(combinedMetrics.numKeys == 4)
+ assert(combinedMetrics.memoryUsedBytes == 30)
+ assert(combinedMetrics.customMetrics.size == 3)
+ assert(combinedMetrics.customMetrics(customSumMetric) == 30L)
+ assert(combinedMetrics.customMetrics(customSizeMetric) == 20L)
+ assert(combinedMetrics.customMetrics(customTimingMetric) == 400L)
+ }
+
/** Return a new provider with a random id */
def newStoreProvider(): ProviderClass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]