Repository: spark Updated Branches: refs/heads/master 0c94e48bc -> 8bb0df2c6
[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener ## What changes were proposed in this pull request? The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it. This patch also includes a trivial doc improvement for `createDirectStream`. Original PR is #21057. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes #21098 from viirya/SPARK-24014. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb0df2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb0df2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb0df2c Branch: refs/heads/master Commit: 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0 Parents: 0c94e48 Author: Liang-Chi Hsieh <[email protected]> Authored: Thu Apr 19 10:00:57 2018 +0800 Committer: jerryshao <[email protected]> Committed: Thu Apr 19 10:00:57 2018 +0800 ---------------------------------------------------------------------- python/pyspark/streaming/kafka.py | 3 ++- python/pyspark/streaming/listener.py | 6 ++++++ python/pyspark/streaming/tests.py | 7 +++++++ 3 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/kafka.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index fdb9308..ed2e0e7 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -104,7 +104,8 @@ class KafkaUtils(object): :param topics: list of topic_name to consume. :param kafkaParams: Additional params for Kafka. :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting - point of the stream. + point of the stream (a dictionary mapping `TopicAndPartition` to + integers). :param keyDecoder: A function used to decode key (default is utf8_decoder). :param valueDecoder: A function used to decode value (default is utf8_decoder). :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/listener.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index b830797..d4ecc21 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -23,6 +23,12 @@ class StreamingListener(object): def __init__(self): pass + def onStreamingStarted(self, streamingStarted): + """ + Called when the streaming has been started. + """ + pass + def onReceiverStarted(self, receiverStarted): """ Called when a receiver has been started http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 7dde7c0..1039409 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase): self.batchInfosCompleted = [] self.batchInfosStarted = [] self.batchInfosSubmitted = [] + self.streamingStartedTime = [] + + def onStreamingStarted(self, streamingStarted): + self.streamingStartedTime.append(streamingStarted.time) def onBatchSubmitted(self, batchSubmitted): self.batchInfosSubmitted.append(batchSubmitted.batchInfo()) @@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase): batchInfosSubmitted = batch_collector.batchInfosSubmitted batchInfosStarted = batch_collector.batchInfosStarted batchInfosCompleted = batch_collector.batchInfosCompleted + streamingStartedTime = batch_collector.streamingStartedTime self.wait_for(batchInfosCompleted, 4) + self.assertEqual(len(streamingStartedTime), 1) + self.assertGreaterEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
