This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0da463e [SPARK-35880][SS] Track the duplicates dropped count in
dedupe operator
0da463e is described below
commit 0da463e59304954515f003f98574c740b47b89fb
Author: Venki Korukanti <[email protected]>
AuthorDate: Mon Jun 28 13:21:00 2021 +0900
[SPARK-35880][SS] Track the duplicates dropped count in dedupe operator
### What changes were proposed in this pull request?
Add a metric to track the number of duplicates dropped in input in
streaming deduplication operator. Also introduce a
`StatefulOperatorCustomMetric` to allow stateful operators to output their own
unique metrics in `StateOperatorProgress.customMetrics` in
`StreamingQueryProgress`.
### Why are the changes needed?
1. Having the duplicates dropped count help monitor and debug any incorrect
results issue or find reasons for state size increases in dedupe operator.
2. New API `StatefulOperatorCustomMetric` allows stateful operators to
expose their own unique metrics in `StateOperatorProgress.customMetrics` in
`StreamingQueryProgress`
### Does this PR introduce _any_ user-facing change?
Yes. For deduplication stateful operator a new metric
`numDuplicatesDropped` is shown in `StateOperatorProgress` within
`StreamingQueryProgress`. Example `StreamingQueryProgress` output in JSON form.
```
{
"id" : "510be3cd-a955-4faf-8456-d97c78d39af5",
"runId" : "c170c4cd-04cb-4a28-b054-74020e3998e1",
...
,
"stateOperators" : [ {
"numRowsTotal" : 1,
"numRowsUpdated" : 1,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"numDuplicatesDropped" : 0,
"stateOnCurrentVersionSizeBytes" : 392
}
}],
...
}
```
### How was this patch tested?
Existing UTs for regression and added a UT.
Closes #33065 from vkorukanti/SPARK-35880.
Authored-by: Venki Korukanti <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../execution/streaming/statefulOperators.scala | 43 ++++++++++++-
.../sql/streaming/StateStoreMetricsTest.scala | 70 +++++++++++++++++-----
.../streaming/StreamingDeduplicationSuite.scala | 49 +++++++++++++++
3 files changed, 143 insertions(+), 19 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index f0527c1..41dcfde 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._
import scala.collection.JavaConverters._
+import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -66,6 +67,24 @@ trait StatefulOperator extends SparkPlan {
}
}
+/**
+ * Custom stateful operator metric definition to allow operators to expose
their own custom metrics.
+ * Also provides [[SQLMetric]] instance to show the metric in UI and
accumulate it at the query
+ * level.
+ */
+trait StatefulOperatorCustomMetric {
+ def name: String
+ def desc: String
+ def createSQLMetric(sparkContext: SparkContext): SQLMetric
+}
+
+/** Custom stateful operator metric for simple "count" gauge */
+case class StatefulOperatorCustomSumMetric(name: String, desc: String)
+ extends StatefulOperatorCustomMetric {
+ override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+ SQLMetrics.createMetric(sparkContext, desc)
+}
+
/** An operator that reads from a StateStore. */
trait StateStoreReader extends StatefulOperator {
override lazy val metrics = Map(
@@ -75,7 +94,7 @@ trait StateStoreReader extends StatefulOperator {
/** An operator that writes to a StateStore. */
trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
- override lazy val metrics = Map(
+ override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
"numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
"number of rows which are dropped by watermark"),
@@ -92,7 +111,7 @@ trait StateStoreWriter extends StatefulOperator { self:
SparkPlan =>
* the driver after this SparkPlan has been executed and metrics have been
updated.
*/
def getProgress(): StateOperatorProgress = {
- val customMetrics = stateStoreCustomMetrics
+ val customMetrics = (stateStoreCustomMetrics ++
statefulOperatorCustomMetrics)
.map(entry => entry._1 -> longMetric(entry._1).value)
val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
@@ -130,6 +149,19 @@ trait StateStoreWriter extends StatefulOperator { self:
SparkPlan =>
}.toMap
}
+ /**
+ * Set of stateful operator custom metrics. These are captured as part of
the generic
+ * key-value map [[StateOperatorProgress.customMetrics]].
+ * Stateful operators can extend this method to provide their own unique
custom metrics.
+ */
+ protected def customStatefulOperatorMetrics:
Seq[StatefulOperatorCustomMetric] = Nil
+
+ private def statefulOperatorCustomMetrics: Map[String, SQLMetric] = {
+ customStatefulOperatorMetrics.map {
+ metric => (metric.name, metric.createSQLMetric(sparkContext))
+ }.toMap
+ }
+
protected def applyRemovingRowsOlderThanWatermark(
iter: Iterator[InternalRow],
predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = {
@@ -468,11 +500,11 @@ case class StreamingDeduplicateExec(
Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false")) {
(store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions,
child.output)
val numOutputRows = longMetric("numOutputRows")
- val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
val commitTimeMs = longMetric("commitTimeMs")
+ val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
val baseIterator = watermarkPredicateForData match {
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter,
predicate)
@@ -492,6 +524,7 @@ case class StreamingDeduplicateExec(
true
} else {
// Drop duplicated rows
+ numDroppedDuplicateRows += 1
false
}
}
@@ -509,6 +542,10 @@ case class StreamingDeduplicateExec(
override def outputPartitioning: Partitioning = child.outputPartitioning
+ override def customStatefulOperatorMetrics:
Seq[StatefulOperatorCustomMetric] = {
+ Seq(StatefulOperatorCustomSumMetric("numDroppedDuplicateRows", "number of
duplicates dropped"))
+ }
+
override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
= {
eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs >
eventTimeWatermark.get
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
index be83f0e..5073723 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
@@ -32,25 +32,14 @@ trait StateStoreMetricsTest extends StreamTest {
def assertNumStateRows(
total: Seq[Long],
updated: Seq[Long],
- droppedByWatermark: Seq[Long]): AssertOnQuery =
+ droppedByWatermark: Seq[Long]): AssertOnQuery = {
AssertOnQuery(s"Check total state rows = $total, updated state rows =
$updated" +
s", rows dropped by watermark = $droppedByWatermark") { q =>
// This assumes that the streaming query will not make any progress
while the eventually
// is being executed.
eventually(timeout(streamingTimeout)) {
- val recentProgress = q.recentProgress
- require(recentProgress.nonEmpty, "No progress made, cannot check num
state rows")
- require(recentProgress.length <
spark.sessionState.conf.streamingProgressRetention,
- "This test assumes that all progresses are present in
q.recentProgress but " +
- "some may have been dropped due to retention limits")
-
- if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
- lastQuery = q
-
- val numStateOperators = recentProgress.last.stateOperators.length
- val progressesSinceLastCheck = recentProgress
- .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
- .filter(_.stateOperators.length == numStateOperators)
+ val (progressesSinceLastCheck, lastCheckedProgressIndex,
numStateOperators) =
+ retrieveProgressesSinceLastCheck(q)
val allNumUpdatedRowsSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
@@ -61,7 +50,7 @@ trait StateStoreMetricsTest extends StreamTest {
lazy val debugString = "recent progresses:\n" +
progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
- val numTotalRows =
recentProgress.last.stateOperators.map(_.numRowsTotal)
+ val numTotalRows =
progressesSinceLastCheck.last.stateOperators.map(_.numRowsTotal)
assert(numTotalRows === total, s"incorrect total rows, $debugString")
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck,
numStateOperators)
@@ -72,10 +61,36 @@ trait StateStoreMetricsTest extends StreamTest {
assert(numRowsDroppedByWatermark === droppedByWatermark,
s"incorrect dropped rows by watermark, $debugString")
- lastCheckedRecentProgressIndex = recentProgress.length - 1
+ advanceLastCheckedRecentProgressIndex(lastCheckedProgressIndex)
+ }
+ true
+ }
+ }
+
+ /** AssertOnQuery to verify the given state operator's custom metric has
expected value */
+ def assertStateOperatorCustomMetric(
+ metric: String, expected: Long, operatorIndex: Int = 0): AssertOnQuery =
{
+ AssertOnQuery(s"Check metrics $metric has value $expected") { q =>
+ eventually(timeout(streamingTimeout)) {
+ val (progressesSinceLastCheck, lastCheckedProgressIndex,
numStateOperators) =
+ retrieveProgressesSinceLastCheck(q)
+ assert(operatorIndex < numStateOperators, s"Invalid operator Index:
$operatorIndex")
+
+ val allCustomMetricValuesSinceLastCheck = progressesSinceLastCheck
+ .map(_.stateOperators(operatorIndex).customMetrics.get(metric))
+ .map(Long2long)
+
+ lazy val debugString = "recent progresses:\n" +
+ progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
+
+ assert(allCustomMetricValuesSinceLastCheck.sum === expected,
+ s"incorrect custom metric ($metric), $debugString")
+
+ advanceLastCheckedRecentProgressIndex(lastCheckedProgressIndex)
}
true
}
+ }
def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery
= {
assert(total.length === updated.length)
@@ -96,4 +111,27 @@ trait StateStoreMetricsTest extends StreamTest {
"Arrays are of different lengths:\n" +
arraySeq.map(_.toSeq).mkString("\n"))
(0 until arrayLength).map { index => arraySeq.map(_.apply(index)).sum }
}
+
+ def retrieveProgressesSinceLastCheck(
+ execution: StreamExecution): (Array[StreamingQueryProgress], Int, Int) =
{
+ val recentProgress = execution.recentProgress
+ require(recentProgress != null, "No progress made")
+ require(recentProgress.length <
spark.sessionState.conf.streamingProgressRetention,
+ "This test assumes that all progresses are present in q.recentProgress
but " +
+ "some may have been dropped due to retention limits")
+
+ if (execution.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
+ lastQuery = execution
+
+ val numStateOperators = recentProgress.last.stateOperators.length
+ val recentProgresses = recentProgress
+ .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
+ .filter(_.stateOperators.length == numStateOperators)
+
+ (recentProgresses, recentProgress.length - 1,
recentProgresses.last.stateOperators.length)
+ }
+
+ def advanceLastCheckedRecentProgressIndex(newLastCheckedRecentProgressIndex:
Int): Unit = {
+ lastCheckedRecentProgressIndex = newLastCheckedRecentProgressIndex
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index ac9cd1a..dc2e787 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -332,4 +332,53 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
}
}
+ test("SPARK-35880: custom metric numDroppedDuplicateRows in state operator
progress") {
+ val dedupeInputData = MemoryStream[(String, Int)]
+ val dedupe = dedupeInputData.toDS().dropDuplicates("_1")
+
+ testStream(dedupe, Append)(
+ AddData(dedupeInputData, "a" -> 1),
+ CheckLastBatch("a" -> 1),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0),
+
+ AddData(dedupeInputData, "a" -> 2, "b" -> 3),
+ CheckLastBatch("b" -> 3),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1),
+
+ AddData(dedupeInputData, "a" -> 5, "b" -> 2, "c" -> 9),
+ CheckLastBatch("c" -> 9),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 2)
+ )
+
+ // with watermark
+ val dedupeWithWMInputData = MemoryStream[Int]
+ val dedupeWithWatermark = dedupeWithWMInputData.toDS()
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .withWatermark("eventTime", "10 seconds")
+ .dropDuplicates()
+ .select($"eventTime".cast("long").as[Long])
+
+ testStream(dedupeWithWatermark, Append)(
+ AddData(dedupeWithWMInputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+ CheckAnswer(10 to 15: _*),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected =
24),
+
+ AddData(dedupeWithWMInputData, 14),
+ CheckNewAnswer(),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1),
+
+ // Advance watermark to 15 secs, no-data-batch drops rows <= 15
+ AddData(dedupeWithWMInputData, 25),
+ CheckNewAnswer(25),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0),
+
+ AddData(dedupeWithWMInputData, 10), // Should not emit anything as data
less than watermark
+ CheckNewAnswer(),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 0),
+
+ AddData(dedupeWithWMInputData, 26, 26),
+ CheckNewAnswer(26),
+ assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1)
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]