Repository: spark Updated Branches: refs/heads/master 2c664edc0 -> 703e6da1e
http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/pyspark/streaming/kafka.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py deleted file mode 100644 index ed2e0e7..0000000 --- a/python/pyspark/streaming/kafka.py +++ /dev/null @@ -1,506 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import warnings - -from py4j.protocol import Py4JJavaError - -from pyspark.rdd import RDD -from pyspark.storagelevel import StorageLevel -from pyspark.serializers import AutoBatchedSerializer, PickleSerializer, PairDeserializer, \ - NoOpSerializer -from pyspark.streaming import DStream -from pyspark.streaming.dstream import TransformedDStream -from pyspark.streaming.util import TransformFunction - -__all__ = ['Broker', 'KafkaMessageAndMetadata', 'KafkaUtils', 'OffsetRange', - 'TopicAndPartition', 'utf8_decoder'] - - -def utf8_decoder(s): - """ Decode the unicode as UTF-8 """ - if s is None: - return None - return s.decode('utf-8') - - -class KafkaUtils(object): - - @staticmethod - def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, - storageLevel=StorageLevel.MEMORY_AND_DISK_2, - keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): - """ - Create an input stream that pulls messages from a Kafka Broker. - - :param ssc: StreamingContext object - :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..). - :param groupId: The group id for this consumer. - :param topics: Dict of (topic_name -> numPartitions) to consume. - Each partition is consumed in its own thread. - :param kafkaParams: Additional params for Kafka - :param storageLevel: RDD storage level. - :param keyDecoder: A function used to decode key (default is utf8_decoder) - :param valueDecoder: A function used to decode value (default is utf8_decoder) - :return: A DStream object - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - if kafkaParams is None: - kafkaParams = dict() - kafkaParams.update({ - "zookeeper.connect": zkQuorum, - "group.id": groupId, - "zookeeper.connection.timeout.ms": "10000", - }) - if not isinstance(topics, dict): - raise TypeError("topics should be dict") - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - helper = KafkaUtils._get_helper(ssc._sc) - jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel) - ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) - stream = DStream(jstream, ssc, ser) - return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1]))) - - @staticmethod - def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, - keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, - messageHandler=None): - """ - Create an input stream that directly pulls messages from a Kafka Broker and specific offset. - - This is not a receiver based Kafka input stream, it directly pulls the message from Kafka - in each batch duration and processed without storing. - - This does not use Zookeeper to store offsets. The consumed offsets are tracked - by the stream itself. For interoperability with Kafka monitoring tools that depend on - Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - You can access the offsets used in each batch from the generated RDDs (see - - To recover from driver failures, you have to enable checkpointing in the StreamingContext. - The information on consumed offset can be recovered from the checkpoint. - See the programming guide for details (constraints, etc.). - - :param ssc: StreamingContext object. - :param topics: list of topic_name to consume. - :param kafkaParams: Additional params for Kafka. - :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting - point of the stream (a dictionary mapping `TopicAndPartition` to - integers). - :param keyDecoder: A function used to decode key (default is utf8_decoder). - :param valueDecoder: A function used to decode value (default is utf8_decoder). - :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess - meta using messageHandler (default is None). - :return: A DStream object - - .. note:: Experimental - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - if fromOffsets is None: - fromOffsets = dict() - if not isinstance(topics, list): - raise TypeError("topics should be list") - if not isinstance(kafkaParams, dict): - raise TypeError("kafkaParams should be dict") - - def funcWithoutMessageHandler(k_v): - return (keyDecoder(k_v[0]), valueDecoder(k_v[1])) - - def funcWithMessageHandler(m): - m._set_key_decoder(keyDecoder) - m._set_value_decoder(valueDecoder) - return messageHandler(m) - - helper = KafkaUtils._get_helper(ssc._sc) - - jfromOffsets = dict([(k._jTopicAndPartition(helper), - v) for (k, v) in fromOffsets.items()]) - if messageHandler is None: - ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) - func = funcWithoutMessageHandler - jstream = helper.createDirectStreamWithoutMessageHandler( - ssc._jssc, kafkaParams, set(topics), jfromOffsets) - else: - ser = AutoBatchedSerializer(PickleSerializer()) - func = funcWithMessageHandler - jstream = helper.createDirectStreamWithMessageHandler( - ssc._jssc, kafkaParams, set(topics), jfromOffsets) - - stream = DStream(jstream, ssc, ser).map(func) - return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer) - - @staticmethod - def createRDD(sc, kafkaParams, offsetRanges, leaders=None, - keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, - messageHandler=None): - """ - Create an RDD from Kafka using offset ranges for each topic and partition. - - :param sc: SparkContext object - :param kafkaParams: Additional params for Kafka - :param offsetRanges: list of offsetRange to specify topic:partition:[start, end) to consume - :param leaders: Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty - map, in which case leaders will be looked up on the driver. - :param keyDecoder: A function used to decode key (default is utf8_decoder) - :param valueDecoder: A function used to decode value (default is utf8_decoder) - :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess - meta using messageHandler (default is None). - :return: An RDD object - - .. note:: Experimental - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - if leaders is None: - leaders = dict() - if not isinstance(kafkaParams, dict): - raise TypeError("kafkaParams should be dict") - if not isinstance(offsetRanges, list): - raise TypeError("offsetRanges should be list") - - def funcWithoutMessageHandler(k_v): - return (keyDecoder(k_v[0]), valueDecoder(k_v[1])) - - def funcWithMessageHandler(m): - m._set_key_decoder(keyDecoder) - m._set_value_decoder(valueDecoder) - return messageHandler(m) - - helper = KafkaUtils._get_helper(sc) - - joffsetRanges = [o._jOffsetRange(helper) for o in offsetRanges] - jleaders = dict([(k._jTopicAndPartition(helper), - v._jBroker(helper)) for (k, v) in leaders.items()]) - if messageHandler is None: - jrdd = helper.createRDDWithoutMessageHandler( - sc._jsc, kafkaParams, joffsetRanges, jleaders) - ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) - rdd = RDD(jrdd, sc, ser).map(funcWithoutMessageHandler) - else: - jrdd = helper.createRDDWithMessageHandler( - sc._jsc, kafkaParams, joffsetRanges, jleaders) - rdd = RDD(jrdd, sc).map(funcWithMessageHandler) - - return KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer) - - @staticmethod - def _get_helper(sc): - try: - return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() - except TypeError as e: - if str(e) == "'JavaPackage' object is not callable": - KafkaUtils._printErrorMsg(sc) - raise - - @staticmethod - def _printErrorMsg(sc): - print(""" -________________________________________________________________________________________________ - - Spark Streaming's Kafka libraries not found in class path. Try one of the following. - - 1. Include the Kafka library and its dependencies with in the - spark-submit command as - - $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:%s ... - - 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, - Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = %s. - Then, include the jar in the spark-submit command as - - $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ... - -________________________________________________________________________________________________ - -""" % (sc.version, sc.version)) - - -class OffsetRange(object): - """ - Represents a range of offsets from a single Kafka TopicAndPartition. - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, topic, partition, fromOffset, untilOffset): - """ - Create an OffsetRange to represent range of offsets - :param topic: Kafka topic name. - :param partition: Kafka partition id. - :param fromOffset: Inclusive starting offset. - :param untilOffset: Exclusive ending offset. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - self.topic = topic - self.partition = partition - self.fromOffset = fromOffset - self.untilOffset = untilOffset - - def __eq__(self, other): - if isinstance(other, self.__class__): - return (self.topic == other.topic - and self.partition == other.partition - and self.fromOffset == other.fromOffset - and self.untilOffset == other.untilOffset) - else: - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def __str__(self): - return "OffsetRange(topic: %s, partition: %d, range: [%d -> %d]" \ - % (self.topic, self.partition, self.fromOffset, self.untilOffset) - - def _jOffsetRange(self, helper): - return helper.createOffsetRange(self.topic, self.partition, self.fromOffset, - self.untilOffset) - - -class TopicAndPartition(object): - """ - Represents a specific topic and partition for Kafka. - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, topic, partition): - """ - Create a Python TopicAndPartition to map to the Java related object - :param topic: Kafka topic name. - :param partition: Kafka partition id. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - self._topic = topic - self._partition = partition - - def _jTopicAndPartition(self, helper): - return helper.createTopicAndPartition(self._topic, self._partition) - - def __eq__(self, other): - if isinstance(other, self.__class__): - return (self._topic == other._topic - and self._partition == other._partition) - else: - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def __hash__(self): - return (self._topic, self._partition).__hash__() - - -class Broker(object): - """ - Represent the host and port info for a Kafka broker. - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, host, port): - """ - Create a Python Broker to map to the Java related object. - :param host: Broker's hostname. - :param port: Broker's port. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - self._host = host - self._port = port - - def _jBroker(self, helper): - return helper.createBroker(self._host, self._port) - - -class KafkaRDD(RDD): - """ - A Python wrapper of KafkaRDD, to provide additional information on normal RDD. - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, jrdd, ctx, jrdd_deserializer): - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - RDD.__init__(self, jrdd, ctx, jrdd_deserializer) - - def offsetRanges(self): - """ - Get the OffsetRange of specific KafkaRDD. - :return: A list of OffsetRange - """ - helper = KafkaUtils._get_helper(self.ctx) - joffsetRanges = helper.offsetRangesOfKafkaRDD(self._jrdd.rdd()) - ranges = [OffsetRange(o.topic(), o.partition(), o.fromOffset(), o.untilOffset()) - for o in joffsetRanges] - return ranges - - -class KafkaDStream(DStream): - """ - A Python wrapper of KafkaDStream - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, jdstream, ssc, jrdd_deserializer): - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - DStream.__init__(self, jdstream, ssc, jrdd_deserializer) - - def foreachRDD(self, func): - """ - Apply a function to each RDD in this DStream. - """ - if func.__code__.co_argcount == 1: - old_func = func - func = lambda r, rdd: old_func(rdd) - jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) \ - .rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser)) - api = self._ssc._jvm.PythonDStream - api.callForeachRDD(self._jdstream, jfunc) - - def transform(self, func): - """ - Return a new DStream in which each RDD is generated by applying a function - on each RDD of this DStream. - - `func` can have one argument of `rdd`, or have two arguments of - (`time`, `rdd`) - """ - if func.__code__.co_argcount == 1: - oldfunc = func - func = lambda t, rdd: oldfunc(rdd) - assert func.__code__.co_argcount == 2, "func should take one or two arguments" - - return KafkaTransformedDStream(self, func) - - -class KafkaTransformedDStream(TransformedDStream): - """ - Kafka specific wrapper of TransformedDStream to transform on Kafka RDD. - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, prev, func): - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - TransformedDStream.__init__(self, prev, func) - - @property - def _jdstream(self): - if self._jdstream_val is not None: - return self._jdstream_val - - jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer) \ - .rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser)) - dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc) - self._jdstream_val = dstream.asJavaDStream() - return self._jdstream_val - - -class KafkaMessageAndMetadata(object): - """ - Kafka message and metadata information. Including topic, partition, offset and message - - .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. - See SPARK-21893. - """ - - def __init__(self, topic, partition, offset, key, message): - """ - Python wrapper of Kafka MessageAndMetadata - :param topic: topic name of this Kafka message - :param partition: partition id of this Kafka message - :param offset: Offset of this Kafka message in the specific partition - :param key: key payload of this Kafka message, can be null if this Kafka message has no key - specified, the return data is undecoded bytearry. - :param message: actual message payload of this Kafka message, the return data is - undecoded bytearray. - """ - warnings.warn( - "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. " - "See SPARK-21893.", - DeprecationWarning) - self.topic = topic - self.partition = partition - self.offset = offset - self._rawKey = key - self._rawMessage = message - self._keyDecoder = utf8_decoder - self._valueDecoder = utf8_decoder - - def __str__(self): - return "KafkaMessageAndMetadata(topic: %s, partition: %d, offset: %d, key and message...)" \ - % (self.topic, self.partition, self.offset) - - def __repr__(self): - return self.__str__() - - def __reduce__(self): - return (KafkaMessageAndMetadata, - (self.topic, self.partition, self.offset, self._rawKey, self._rawMessage)) - - def _set_key_decoder(self, decoder): - self._keyDecoder = decoder - - def _set_value_decoder(self, decoder): - self._valueDecoder = decoder - - @property - def key(self): - return self._keyDecoder(self._rawKey) - - @property - def message(self): - return self._valueDecoder(self._rawMessage) http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4b995c0..8df00bc 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -47,7 +47,6 @@ if sys.version >= "3": from pyspark.context import SparkConf, SparkContext, RDD from pyspark.storagelevel import StorageLevel from pyspark.streaming.context import StreamingContext -from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream from pyspark.streaming.listener import StreamingListener @@ -1047,259 +1046,6 @@ class CheckpointTests(unittest.TestCase): self.ssc.stop(True, True) -class KafkaStreamTests(PySparkStreamingTestCase): - timeout = 20 # seconds - duration = 1 - - def setUp(self): - super(KafkaStreamTests, self).setUp() - self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils() - self._kafkaTestUtils.setup() - - def tearDown(self): - super(KafkaStreamTests, self).tearDown() - - if self._kafkaTestUtils is not None: - self._kafkaTestUtils.teardown() - self._kafkaTestUtils = None - - def _randomTopic(self): - return "topic-%d" % random.randint(0, 10000) - - def _validateStreamResult(self, sendData, stream): - result = {} - for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]), - sum(sendData.values()))): - result[i] = result.get(i, 0) + 1 - - self.assertEqual(sendData, result) - - def _validateRddResult(self, sendData, rdd): - result = {} - for i in rdd.map(lambda x: x[1]).collect(): - result[i] = result.get(i, 0) + 1 - self.assertEqual(sendData, result) - - def test_kafka_stream(self): - """Test the Python Kafka stream API.""" - topic = self._randomTopic() - sendData = {"a": 3, "b": 5, "c": 10} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), - "test-streaming-consumer", {topic: 1}, - {"auto.offset.reset": "smallest"}) - self._validateStreamResult(sendData, stream) - - def test_kafka_direct_stream(self): - """Test the Python direct Kafka stream API.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), - "auto.offset.reset": "smallest"} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) - self._validateStreamResult(sendData, stream) - - def test_kafka_direct_stream_from_offset(self): - """Test the Python direct Kafka stream API with start offset specified.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - fromOffsets = {TopicAndPartition(topic, 0): long(0)} - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets) - self._validateStreamResult(sendData, stream) - - def test_kafka_rdd(self): - """Test the Python direct Kafka RDD API.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2} - offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) - self._validateRddResult(sendData, rdd) - - def test_kafka_rdd_with_leaders(self): - """Test the Python direct Kafka RDD API with leaders.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} - address = self._kafkaTestUtils.brokerAddress().split(":") - leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) - self._validateRddResult(sendData, rdd) - - def test_kafka_rdd_get_offsetRanges(self): - """Test Python direct Kafka RDD get OffsetRanges.""" - topic = self._randomTopic() - sendData = {"a": 3, "b": 4, "c": 5} - offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) - self.assertEqual(offsetRanges, rdd.offsetRanges()) - - def test_kafka_direct_stream_foreach_get_offsetRanges(self): - """Test the Python direct Kafka stream foreachRDD get offsetRanges.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), - "auto.offset.reset": "smallest"} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) - - offsetRanges = [] - - def getOffsetRanges(_, rdd): - for o in rdd.offsetRanges(): - offsetRanges.append(o) - - stream.foreachRDD(getOffsetRanges) - self.ssc.start() - self.wait_for(offsetRanges, 1) - - self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) - - def test_kafka_direct_stream_transform_get_offsetRanges(self): - """Test the Python direct Kafka stream transform get offsetRanges.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), - "auto.offset.reset": "smallest"} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) - - offsetRanges = [] - - def transformWithOffsetRanges(rdd): - for o in rdd.offsetRanges(): - offsetRanges.append(o) - return rdd - - # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together, - # only the TransformedDstreams can be folded together. - stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint() - self.ssc.start() - self.wait_for(offsetRanges, 1) - - self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) - - def test_topic_and_partition_equality(self): - topic_and_partition_a = TopicAndPartition("foo", 0) - topic_and_partition_b = TopicAndPartition("foo", 0) - topic_and_partition_c = TopicAndPartition("bar", 0) - topic_and_partition_d = TopicAndPartition("foo", 1) - - self.assertEqual(topic_and_partition_a, topic_and_partition_b) - self.assertNotEqual(topic_and_partition_a, topic_and_partition_c) - self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) - - def test_kafka_direct_stream_transform_with_checkpoint(self): - """Test the Python direct Kafka stream transform with checkpoint correctly recovered.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), - "auto.offset.reset": "smallest"} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - offsetRanges = [] - - def transformWithOffsetRanges(rdd): - for o in rdd.offsetRanges(): - offsetRanges.append(o) - return rdd - - self.ssc.stop(False) - self.ssc = None - tmpdir = "checkpoint-test-%d" % random.randint(0, 10000) - - def setup(): - ssc = StreamingContext(self.sc, 0.5) - ssc.checkpoint(tmpdir) - stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) - stream.transform(transformWithOffsetRanges).count().pprint() - return ssc - - try: - ssc1 = StreamingContext.getOrCreate(tmpdir, setup) - ssc1.start() - self.wait_for(offsetRanges, 1) - self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) - - # To make sure some checkpoint is written - time.sleep(3) - ssc1.stop(False) - ssc1 = None - - # Restart again to make sure the checkpoint is recovered correctly - ssc2 = StreamingContext.getOrCreate(tmpdir, setup) - ssc2.start() - ssc2.awaitTermination(3) - ssc2.stop(stopSparkContext=False, stopGraceFully=True) - ssc2 = None - finally: - shutil.rmtree(tmpdir) - - def test_kafka_rdd_message_handler(self): - """Test Python direct Kafka RDD MessageHandler.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 1, "c": 2} - offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} - - def getKeyAndDoubleMessage(m): - return m and (m.key, m.message * 2) - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, - messageHandler=getKeyAndDoubleMessage) - self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd) - - def test_kafka_direct_stream_message_handler(self): - """Test the Python direct Kafka stream MessageHandler.""" - topic = self._randomTopic() - sendData = {"a": 1, "b": 2, "c": 3} - kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), - "auto.offset.reset": "smallest"} - - self._kafkaTestUtils.createTopic(topic) - self._kafkaTestUtils.sendMessages(topic, sendData) - - def getKeyAndDoubleMessage(m): - return m and (m.key, m.message * 2) - - stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, - messageHandler=getKeyAndDoubleMessage) - self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream) - - class KinesisStreamTests(PySparkStreamingTestCase): def test_kinesis_stream_api(self): @@ -1371,23 +1117,6 @@ def search_jar(dir, name_prefix): return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)] -def search_kafka_assembly_jar(): - SPARK_HOME = os.environ["SPARK_HOME"] - kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-0-8-assembly") - jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-0-8-assembly") - if not jars: - raise Exception( - ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + - "You need to build Spark with " - "'build/sbt -Pkafka-0-8 assembly/package streaming-kafka-0-8-assembly/assembly' or " - "'build/mvn -DskipTests -Pkafka-0-8 package' before running this test.") - elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please " - "remove all but one") % (", ".join(jars))) - else: - return jars[0] - - def _kinesis_asl_assembly_dir(): SPARK_HOME = os.environ["SPARK_HOME"] return os.path.join(SPARK_HOME, "external/kinesis-asl-assembly") @@ -1404,38 +1133,26 @@ def search_kinesis_asl_assembly_jar(): return jars[0] -# Must be same as the variable and condition defined in modules.py -kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS" -are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1' # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py kinesis_test_environ_var = "ENABLE_KINESIS_TESTS" are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1' if __name__ == "__main__": from pyspark.streaming.tests import * - kafka_assembly_jar = search_kafka_assembly_jar() kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() if kinesis_asl_assembly_jar is None: kinesis_jar_present = False - jars = kafka_assembly_jar + jars_args = "" else: kinesis_jar_present = True - jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar) + jars_args = "--jars %s" % kinesis_asl_assembly_jar existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") - jars_args = "--jars %s" % jars os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, StreamingListenerTests] - if are_kafka_tests_enabled: - testcases.append(KafkaStreamTests) - else: - sys.stderr.write( - "Skipped test_kafka_stream (enable by setting environment variable %s=1" - % kafka_test_environ_var) - if kinesis_jar_present is True: testcases.append(KinesisStreamTests) elif are_kinesis_tests_enabled is False: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
