This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 574a43b [SPARK-37147][SS] MetricsReporter producing
NullPointerException when element 'triggerExecution' not present in Map[]
574a43b is described below
commit 574a43be0b4f46917467f4b7aabb2b6cb5892168
Author: Radek Busz <[email protected]>
AuthorDate: Sun Oct 31 07:30:14 2021 +0900
[SPARK-37147][SS] MetricsReporter producing NullPointerException when
element 'triggerExecution' not present in Map[]
### What changes were proposed in this pull request?
Bug Fix.
The problematic code is in `MetricsReporter`:
`registerGauge("latency", _.durationMs.get("triggerExecution").longValue(),
0L)`
Instead of `.getOrDefault(...).longValue()` it uses
`.get("triggerExecution").longValue()` which can return a null pointer
exception if "triggerExecution" is not in the durationMs map.
Solution: use `.getOrDefault` when accessing a map.
### Why are the changes needed?
When `MetricsReporter.scala` registers a Gauge it occasionally returns a
NPE. This breaks reporting custom metrics via Dropwizard and logs multiple
times a stacktrace. It usually happens when using StructuredStreaming on a slow
data source but I'm not able to reliably reproduce it every time.
### Does this PR introduce _any_ user-facing change?
Yes - fixes occasional failures when reporting metrics with Dropwizard
### How was this patch tested?
Added a unit-test.
Closes #34426 from gitplaneta/SPARK-37147.
Authored-by: Radek Busz <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/execution/streaming/MetricsReporter.scala | 2 +-
.../apache/spark/sql/streaming/StreamingQuerySuite.scala | 13 +++++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 8709822..600b16f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -40,7 +40,7 @@ class MetricsReporter(
// together in Ganglia as a single metric group
registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)
registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
- registerGauge("latency", _.durationMs.get("triggerExecution").longValue(),
0L)
+ registerGauge("latency", _.durationMs.getOrDefault("triggerExecution",
0L).longValue(), 0L)
private val timestampFormat = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9c2403d..21a0b24 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -19,13 +19,16 @@ package org.apache.spark.sql.streaming
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Collections
import java.util.concurrent.CountDownLatch
import scala.collection.mutable
+import scala.util.{Success, Try}
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.RandomStringUtils
import org.apache.hadoop.fs.Path
+import org.mockito.Mockito.when
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -465,6 +468,16 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
}
}
+ test("SPARK-37147: MetricsReporter does not fail when durationMs is empty") {
+ val stateOpProgressMock = mock[StreamingQueryProgress]
+
when(stateOpProgressMock.durationMs).thenReturn(Collections.emptyMap[String,
java.lang.Long]())
+ val streamExecMock = mock[StreamExecution]
+ when(streamExecMock.lastProgress).thenReturn(stateOpProgressMock)
+
+ val gauges = new MetricsReporter(streamExecMock,
"").metricRegistry.getGauges()
+ assert(Try(gauges.get("latency").getValue) == Success(0L))
+ }
+
test("input row calculation with same V1 source used twice in self-join") {
val streamingTriggerDF = spark.createDataset(1 to 10).toDF
val streamingInputDF =
createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]