This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e594f0  [SPARK-35763][SS] Remove the StateStoreCustomMetric subclass 
enumeration dependency
8e594f0 is described below

commit 8e594f084a9f307cefcad13ccf439b0b608228c9
Author: Venki Korukanti <venki.koruka...@gmail.com>
AuthorDate: Thu Jun 17 07:48:24 2021 +0900

    [SPARK-35763][SS] Remove the StateStoreCustomMetric subclass enumeration 
dependency
    
    ### What changes were proposed in this pull request?
    
    Remove the usage of the enumerating subclasses of `StateStoreCustomMetric` 
dependency.
    
    To achieve it, add couple of utility methods to `StateStoreCustomMetric`
    * `withNewDesc(desc : String)` to `StateStoreCustomMetric` for cloning the 
instance with a new `desc` (currently used in `SymmetricHashJoinStateManager`)
    * `createSQLMetric(sparkContext: sparkContext): SQLMetric` for creating a 
corresponding `SQLMetric` to show the metric in UI and accumulate at the query 
level (currently used in `statefulOperator. stateStoreCustomMetrics`)
    
    ### Why are the changes needed?
    
    Code in 
[SymmetricHashJoinStateManager](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L321)
 and 
[StateStoreWriter](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L129)
 rely on the subclass implementations of 
[StateStoreCustomMetric](https://github.com/apache/spark/blob/master/sql/core/src/main/sca
 [...]
    
    If a new subclass of `StateStoreCustomMetric` is added, it requires code 
changes to `SymmetricHashJoinStateManager` and `StateStoreWriter`, and we may 
miss the update if there is no existing test coverage.
    
    To prevent these issues add a couple of utility methods to 
`StateStoreCustomMetric` as mentioned above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UT and a new UT
    
    Closes #32914 from vkorukanti/SPARK-35763.
    
    Authored-by: Venki Korukanti <venki.koruka...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/StateStore.scala | 29 +++++++++++++++++++---
 .../state/SymmetricHashJoinStateManager.scala      | 10 +-------
 .../execution/streaming/statefulOperators.scala    |  7 +-----
 .../streaming/state/StateStoreSuite.scala          | 14 +++++++++++
 4 files changed, 41 insertions(+), 19 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 889477b..60ad318 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -182,16 +183,36 @@ object StateStoreMetrics {
 
 /**
  * Name and description of custom implementation-specific metrics that a
- * state store may wish to expose.
+ * state store may wish to expose. Also provides [[SQLMetric]] instance to
+ * show the metric in UI and accumulate it at the query level.
  */
 trait StateStoreCustomMetric {
   def name: String
   def desc: String
+  def withNewDesc(desc: String): StateStoreCustomMetric
+  def createSQLMetric(sparkContext: SparkContext): SQLMetric
 }
 
-case class StateStoreCustomSumMetric(name: String, desc: String) extends 
StateStoreCustomMetric
-case class StateStoreCustomSizeMetric(name: String, desc: String) extends 
StateStoreCustomMetric
-case class StateStoreCustomTimingMetric(name: String, desc: String) extends 
StateStoreCustomMetric
+case class StateStoreCustomSumMetric(name: String, desc: String) extends 
StateStoreCustomMetric {
+  override def withNewDesc(newDesc: String): StateStoreCustomSumMetric = 
copy(desc = desc)
+
+  override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+    SQLMetrics.createMetric(sparkContext, desc)
+}
+
+case class StateStoreCustomSizeMetric(name: String, desc: String) extends 
StateStoreCustomMetric {
+  override def withNewDesc(desc: String): StateStoreCustomSizeMetric = 
copy(desc = desc)
+
+  override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+    SQLMetrics.createSizeMetric(sparkContext, desc)
+}
+
+case class StateStoreCustomTimingMetric(name: String, desc: String) extends 
StateStoreCustomMetric {
+  override def withNewDesc(desc: String): StateStoreCustomTimingMetric = 
copy(desc = desc)
+
+  override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+    SQLMetrics.createTimingMetric(sparkContext, desc)
+}
 
 /**
  * An exception thrown when an invalid UnsafeRow is detected in state store.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 5c74811..d342c83 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -319,15 +319,7 @@ class SymmetricHashJoinStateManager(
       keyWithIndexToValueMetrics.numKeys,       // represent each buffered row 
only once
       keyToNumValuesMetrics.memoryUsedBytes + 
keyWithIndexToValueMetrics.memoryUsedBytes,
       keyWithIndexToValueMetrics.customMetrics.map {
-        case (s @ StateStoreCustomSumMetric(_, desc), value) =>
-          s.copy(desc = newDesc(desc)) -> value
-        case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
-          s.copy(desc = newDesc(desc)) -> value
-        case (s @ StateStoreCustomTimingMetric(_, desc), value) =>
-          s.copy(desc = newDesc(desc)) -> value
-        case (s, _) =>
-          throw new IllegalArgumentException(
-            s"Unknown state store custom metric is found at metrics: $s")
+        case (metric, value) => (metric.withNewDesc(desc = 
newDesc(metric.desc)), value)
       }
     )
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index cef0c01..f9b639b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -126,12 +126,7 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
   private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
     val provider = 
StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
     provider.supportedCustomMetrics.map {
-      case StateStoreCustomSumMetric(name, desc) =>
-        name -> SQLMetrics.createMetric(sparkContext, desc)
-      case StateStoreCustomSizeMetric(name, desc) =>
-        name -> SQLMetrics.createSizeMetric(sparkContext, desc)
-      case StateStoreCustomTimingMetric(name, desc) =>
-        name -> SQLMetrics.createTimingMetric(sparkContext, desc)
+      metric => (metric.name, metric.createSQLMetric(sparkContext))
     }.toMap
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index af5e9bb..4323725 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -1013,6 +1013,20 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     assert(err.getMessage.contains("Cannot put a null value"))
   }
 
+  test("SPARK-35763: StateStoreCustomMetric withNewDesc and createSQLMetric") {
+    val metric = StateStoreCustomSizeMetric(name = "m1", desc = "desc1")
+    val metricNew = metric.withNewDesc("new desc")
+    assert(metricNew.desc === "new desc", "incorrect description in copied 
instance")
+    assert(metricNew.name === "m1", "incorrect name in copied instance")
+
+    val conf = new 
SparkConf().setMaster("local").setAppName("SPARK-35763").set(RPC_NUM_RETRIES, 1)
+    withSpark(new SparkContext(conf)) { sc =>
+      val sqlMetric = metric.createSQLMetric(sc)
+      assert(sqlMetric != null)
+      assert(sqlMetric.name === Some("desc1"))
+    }
+  }
+
   /** Return a new provider with a random id */
   def newStoreProvider(): ProviderClass
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to