Repository: spark Updated Branches: refs/heads/master 182e11904 -> 29081b587
[SPARK-16950] [PYSPARK] fromOffsets parameter support in KafkaUtils.createDirectStream for python3 ## What changes were proposed in this pull request? Ability to use KafkaUtils.createDirectStream with starting offsets in python 3 by using java.lang.Number instead of Long during param mapping in scala helper. This allows py4j to pass Integer or Long to the map and resolves ClassCastException problems. ## How was this patch tested? unit tests jerryshao - could you please look at this PR? Author: Mariusz Strzelecki <mariusz.strzele...@allegrogroup.com> Closes #14540 from szczeles/kafka_pyspark. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29081b58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29081b58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29081b58 Branch: refs/heads/master Commit: 29081b587f3423bf5a3e0066357884d0c26a04bf Parents: 182e119 Author: Mariusz Strzelecki <mariusz.strzele...@allegrogroup.com> Authored: Tue Aug 9 09:44:43 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Tue Aug 9 09:44:43 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/streaming/kafka/KafkaUtils.scala | 8 ++++---- python/pyspark/streaming/kafka.py | 3 +++ python/pyspark/streaming/tests.py | 12 +++--------- 3 files changed, 10 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/29081b58/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index edaafb9..b17e198 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kafka import java.io.OutputStream -import java.lang.{Integer => JInt, Long => JLong} +import java.lang.{Integer => JInt, Long => JLong, Number => JNumber} import java.nio.charset.StandardCharsets import java.util.{List => JList, Map => JMap, Set => JSet} @@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper { jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = { + fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[(Array[Byte], Array[Byte])] = { val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message) new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler)) @@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper { jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = { + fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] = { val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message()) val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler). @@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper { jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong], + fromOffsets: JMap[TopicAndPartition, JNumber], messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = { val currentFromOffsets = if (!fromOffsets.isEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/29081b58/python/pyspark/streaming/kafka.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 2c1a667..bf27d80 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -287,6 +287,9 @@ class TopicAndPartition(object): def __ne__(self, other): return not self.__eq__(other) + def __hash__(self): + return (self._topic, self._partition).__hash__() + class Broker(object): """ http://git-wip-us.apache.org/repos/asf/spark/blob/29081b58/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 360ba1e..5ac007c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -41,6 +41,9 @@ if sys.version_info[:2] <= (2, 6): else: import unittest +if sys.version >= "3": + long = int + from pyspark.context import SparkConf, SparkContext, RDD from pyspark.storagelevel import StorageLevel from pyspark.streaming.context import StreamingContext @@ -1058,7 +1061,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) self._validateStreamResult(sendData, stream) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_from_offset(self): """Test the Python direct Kafka stream API with start offset specified.""" topic = self._randomTopic() @@ -1072,7 +1074,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets) self._validateStreamResult(sendData, stream) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd(self): """Test the Python direct Kafka RDD API.""" topic = self._randomTopic() @@ -1085,7 +1086,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_with_leaders(self): """Test the Python direct Kafka RDD API with leaders.""" topic = self._randomTopic() @@ -1100,7 +1100,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) self._validateRddResult(sendData, rdd) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_get_offsetRanges(self): """Test Python direct Kafka RDD get OffsetRanges.""" topic = self._randomTopic() @@ -1113,7 +1112,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self.assertEqual(offsetRanges, rdd.offsetRanges()) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_foreach_get_offsetRanges(self): """Test the Python direct Kafka stream foreachRDD get offsetRanges.""" topic = self._randomTopic() @@ -1138,7 +1136,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_transform_get_offsetRanges(self): """Test the Python direct Kafka stream transform get offsetRanges.""" topic = self._randomTopic() @@ -1176,7 +1173,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self.assertNotEqual(topic_and_partition_a, topic_and_partition_c) self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_transform_with_checkpoint(self): """Test the Python direct Kafka stream transform with checkpoint correctly recovered.""" topic = self._randomTopic() @@ -1225,7 +1221,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): finally: shutil.rmtree(tmpdir) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_message_handler(self): """Test Python direct Kafka RDD MessageHandler.""" topic = self._randomTopic() @@ -1242,7 +1237,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): messageHandler=getKeyAndDoubleMessage) self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd) - @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_message_handler(self): """Test the Python direct Kafka stream MessageHandler.""" topic = self._randomTopic() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org