This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 907074bafad [SPARK-38881][DSTREAMS][KINESIS][PYSPARK] Added Support
for CloudWatch MetricsLevel Config
907074bafad is described below
commit 907074bafad0da3d1c802a4389589658ecf93432
Author: Mark Khaitman <[email protected]>
AuthorDate: Sat Apr 16 21:30:15 2022 -0500
[SPARK-38881][DSTREAMS][KINESIS][PYSPARK] Added Support for CloudWatch
MetricsLevel Config
JIRA: https://issues.apache.org/jira/browse/SPARK-38881
### What changes were proposed in this pull request?
Exposing a configuration option (metricsLevel) used for CloudWatch metrics
reporting when consuming from an AWS Kinesis Stream, which is already available
in Scala/Java Spark APIs
This relates to https://issues.apache.org/jira/browse/SPARK-27420 which was
merged as part of Spark 3.0.0
### Why are the changes needed?
This change is desirable as it further exposes the metricsLevel config
parameter that was added for the Scala/Java Spark APIs when working with the
Kinesis Streaming integration, and makes it available to the PySpark API as
well.
### Does this PR introduce _any_ user-facing change?
No. Default behavior of MetricsLevel.DETAILED is maintained.
### How was this patch tested?
This change passes all tests, and local testing was done with a development
Kinesis stream in AWS, in order to confirm that metrics were no longer being
reported to CloudWatch after specifying MetricsLevel.NONE in the PySpark
Kinesis streaming context creation, and also worked as it does today when
leaving the MetricsLevel parameter out, which would result in a default of
DETAILED, with CloudWatch metrics appearing again.
Built with:
```
# ./build/mvn -pl :spark-streaming-kinesis-asl_2.12 -DskipTests
-Pkinesis-asl clean install
```
Tested with small pyspark kinesis streaming context + AWS kinesis stream,
using updated streaming kinesis asl jar:
```
# spark-submit --packages
org.apache.spark:spark-streaming-kinesis-asl_2.12:3.2.1 --jars
spark/connector/kinesis-asl/target/spark-streaming-kinesis-asl_2.12-3.4.0-SNAPSHOT.jar
metricsLevelTesting.py
```
Closes #36201 from mkman84/metricsLevel-pyspark.
Authored-by: Mark Khaitman <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../kinesis/KinesisUtilsPythonHelper.scala | 10 ++++++++++
docs/streaming-kinesis-integration.md | 10 ++++++----
python/pyspark/streaming/kinesis.py | 22 +++++++++++++++++-----
python/pyspark/streaming/tests/test_kinesis.py | 5 ++++-
4 files changed, 37 insertions(+), 10 deletions(-)
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
index 0056438c4ee..8abaef6b834 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
@@ -37,6 +38,7 @@ private class KinesisUtilsPythonHelper {
regionName: String,
initialPositionInStream: Int,
checkpointInterval: Duration,
+ metricsLevel: Int,
storageLevel: StorageLevel,
awsAccessKeyId: String,
awsSecretKey: String,
@@ -64,6 +66,13 @@ private class KinesisUtilsPythonHelper {
"InitialPositionInStream.LATEST or
InitialPositionInStream.TRIM_HORIZON")
}
+ val cloudWatchMetricsLevel = metricsLevel match {
+ case 0 => MetricsLevel.DETAILED
+ case 1 => MetricsLevel.SUMMARY
+ case 2 => MetricsLevel.NONE
+ case _ => MetricsLevel.DETAILED
+ }
+
val builder = KinesisInputDStream.builder.
streamingContext(jssc).
checkpointAppName(kinesisAppName).
@@ -72,6 +81,7 @@ private class KinesisUtilsPythonHelper {
regionName(regionName).
initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(kinesisInitialPosition)).
checkpointInterval(checkpointInterval).
+ metricsLevel(cloudWatchMetricsLevel).
storageLevel(storageLevel)
if (stsAssumeRoleArn != null && stsSessionName != null && stsExternalId !=
null) {
diff --git a/docs/streaming-kinesis-integration.md
b/docs/streaming-kinesis-integration.md
index dc80ff05226..2ce30d7efe2 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -56,6 +56,7 @@ A Kinesis stream can be set up at one of the valid Kinesis
endpoints with 1 or m
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
+ .metricsLevel([metricsLevel.DETAILED])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
@@ -78,6 +79,7 @@ A Kinesis stream can be set up at one of the valid Kinesis
endpoints with 1 or m
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
+ .metricsLevel([metricsLevel.DETAILED])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build();
@@ -90,20 +92,20 @@ A Kinesis stream can be set up at one of the valid Kinesis
endpoints with 1 or m
kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name],
[endpoint URL],
- [region name], [initial position], [checkpoint interval],
StorageLevel.MEMORY_AND_DISK_2)
+ [region name], [initial position], [checkpoint interval],
[metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
See the [API docs](api/python/reference/pyspark.streaming.html#kinesis)
and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
Refer to the [Running the Example](#running-the-example) subsection for
instructions to run the example.
+ - CloudWatch metrics level and dimensions. See [the AWS documentation
about monitoring
KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)
for details. Default is MetricsLevel.DETAILED
+
</div>
</div>
- You may also provide the following settings. These are currently only
supported in Scala and Java.
+ You may also provide the following settings. This is currently only
supported in Scala and Java.
- A "message handler function" that takes a Kinesis `Record` and
returns a generic object `T`, in case you would like to use other data included
in a `Record` such as partition key.
- - CloudWatch metrics level and dimensions. See [the AWS documentation
about monitoring
KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)
for details.
-
<div class="codetabs">
<div data-lang="scala" markdown="1">
import collection.JavaConverters._
diff --git a/python/pyspark/streaming/kinesis.py
b/python/pyspark/streaming/kinesis.py
index 150fb79f572..0eede341f9b 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -23,7 +23,16 @@ from pyspark.streaming.context import StreamingContext
from pyspark.util import _print_missing_jar
-__all__ = ["KinesisUtils", "InitialPositionInStream", "utf8_decoder"]
+__all__ = ["KinesisUtils", "InitialPositionInStream", "MetricsLevel",
"utf8_decoder"]
+
+
+class InitialPositionInStream:
+ LATEST, TRIM_HORIZON = (0, 1)
+
+
+class MetricsLevel:
+ DETAILED, SUMMARY, NONE = (0, 1, 2)
+
T = TypeVar("T")
@@ -46,6 +55,7 @@ class KinesisUtils:
regionName: str,
initialPositionInStream: str,
checkpointInterval: int,
+ metricsLevel: int = MetricsLevel.DETAILED,
storageLevel: StorageLevel = ...,
awsAccessKeyId: Optional[str] = ...,
awsSecretKey: Optional[str] = ...,
@@ -66,6 +76,7 @@ class KinesisUtils:
regionName: str,
initialPositionInStream: str,
checkpointInterval: int,
+ metricsLevel: int = MetricsLevel.DETAILED,
storageLevel: StorageLevel = ...,
awsAccessKeyId: Optional[str] = ...,
awsSecretKey: Optional[str] = ...,
@@ -85,6 +96,7 @@ class KinesisUtils:
regionName: str,
initialPositionInStream: str,
checkpointInterval: int,
+ metricsLevel: int = MetricsLevel.DETAILED,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
awsAccessKeyId: Optional[str] = None,
awsSecretKey: Optional[str] = None,
@@ -123,6 +135,9 @@ class KinesisUtils:
Checkpoint interval(in seconds) for Kinesis checkpointing. See the
Kinesis
Spark Streaming documentation for more details on the different
types of checkpoints.
+ metricsLevel : int
+ Level of CloudWatch PutMetrics.
+ Can be set to either DETAILED, SUMMARY, or NONE. (default is
DETAILED)
storageLevel : :class:`pyspark.StorageLevel`, optional
Storage level to use for storing the received objects (default is
StorageLevel.MEMORY_AND_DISK_2)
@@ -178,6 +193,7 @@ class KinesisUtils:
regionName,
initialPositionInStream,
jduration,
+ metricsLevel,
jlevel,
awsAccessKeyId,
awsSecretKey,
@@ -187,7 +203,3 @@ class KinesisUtils:
)
stream: DStream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))
-
-
-class InitialPositionInStream:
- LATEST, TRIM_HORIZON = (0, 1)
diff --git a/python/pyspark/streaming/tests/test_kinesis.py
b/python/pyspark/streaming/tests/test_kinesis.py
index 221ec4dd984..7b09f5b8f5d 100644
--- a/python/pyspark/streaming/tests/test_kinesis.py
+++ b/python/pyspark/streaming/tests/test_kinesis.py
@@ -18,7 +18,7 @@ import time
import unittest
from pyspark import StorageLevel
-from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream,
MetricsLevel
from pyspark.testing.streamingutils import (
should_test_kinesis,
kinesis_requirement_message,
@@ -38,6 +38,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
"us-west-2",
InitialPositionInStream.LATEST,
2,
+ MetricsLevel.DETAILED,
StorageLevel.MEMORY_AND_DISK_2,
)
KinesisUtils.createStream(
@@ -48,6 +49,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
"us-west-2",
InitialPositionInStream.LATEST,
2,
+ MetricsLevel.DETAILED,
StorageLevel.MEMORY_AND_DISK_2,
"awsAccessKey",
"awsSecretKey",
@@ -69,6 +71,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
kinesisTestUtils.regionName(),
InitialPositionInStream.LATEST,
10,
+ MetricsLevel.DETAILED,
StorageLevel.MEMORY_ONLY,
aWSCredentials.getAWSAccessKeyId(),
aWSCredentials.getAWSSecretKey(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]