Repository: spark
Updated Branches:
  refs/heads/master 182e11904 -> 29081b587


[SPARK-16950] [PYSPARK] fromOffsets parameter support in 
KafkaUtils.createDirectStream for python3

## What changes were proposed in this pull request?

Ability to use KafkaUtils.createDirectStream with starting offsets in python 3 
by using java.lang.Number instead of Long during param mapping in scala helper. 
This allows py4j to pass Integer or Long to the map and resolves 
ClassCastException problems.

## How was this patch tested?

unit tests

jerryshao  - could you please look at this PR?

Author: Mariusz Strzelecki <mariusz.strzele...@allegrogroup.com>

Closes #14540 from szczeles/kafka_pyspark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29081b58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29081b58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29081b58

Branch: refs/heads/master
Commit: 29081b587f3423bf5a3e0066357884d0c26a04bf
Parents: 182e119
Author: Mariusz Strzelecki <mariusz.strzele...@allegrogroup.com>
Authored: Tue Aug 9 09:44:43 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Tue Aug 9 09:44:43 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/kafka/KafkaUtils.scala   |  8 ++++----
 python/pyspark/streaming/kafka.py                       |  3 +++
 python/pyspark/streaming/tests.py                       | 12 +++---------
 3 files changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29081b58/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index edaafb9..b17e198 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.kafka
 
 import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong}
+import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
 import java.nio.charset.StandardCharsets
 import java.util.{List => JList, Map => JMap, Set => JSet}
 
@@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper {
       jssc: JavaStreamingContext,
       kafkaParams: JMap[String, String],
       topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], 
Array[Byte])] = {
+      fromOffsets: JMap[TopicAndPartition, JNumber]): 
JavaDStream[(Array[Byte], Array[Byte])] = {
     val messageHandler =
       (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
     new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler))
@@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper {
       jssc: JavaStreamingContext,
       kafkaParams: JMap[String, String],
       topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = 
{
+      fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] 
= {
     val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
       new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, 
mmd.key(), mmd.message())
     val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler).
@@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper {
       jssc: JavaStreamingContext,
       kafkaParams: JMap[String, String],
       topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JLong],
+      fromOffsets: JMap[TopicAndPartition, JNumber],
       messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
DStream[V] = {
 
     val currentFromOffsets = if (!fromOffsets.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/29081b58/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index 2c1a667..bf27d80 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -287,6 +287,9 @@ class TopicAndPartition(object):
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    def __hash__(self):
+        return (self._topic, self._partition).__hash__()
+
 
 class Broker(object):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/29081b58/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 360ba1e..5ac007c 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -41,6 +41,9 @@ if sys.version_info[:2] <= (2, 6):
 else:
     import unittest
 
+if sys.version >= "3":
+    long = int
+
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.context import StreamingContext
@@ -1058,7 +1061,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
         self._validateStreamResult(sendData, stream)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_direct_stream_from_offset(self):
         """Test the Python direct Kafka stream API with start offset 
specified."""
         topic = self._randomTopic()
@@ -1072,7 +1074,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, 
fromOffsets)
         self._validateStreamResult(sendData, stream)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_rdd(self):
         """Test the Python direct Kafka RDD API."""
         topic = self._randomTopic()
@@ -1085,7 +1086,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
         self._validateRddResult(sendData, rdd)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_rdd_with_leaders(self):
         """Test the Python direct Kafka RDD API with leaders."""
         topic = self._randomTopic()
@@ -1100,7 +1100,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
         self._validateRddResult(sendData, rdd)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_rdd_get_offsetRanges(self):
         """Test Python direct Kafka RDD get OffsetRanges."""
         topic = self._randomTopic()
@@ -1113,7 +1112,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
         self.assertEqual(offsetRanges, rdd.offsetRanges())
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_direct_stream_foreach_get_offsetRanges(self):
         """Test the Python direct Kafka stream foreachRDD get offsetRanges."""
         topic = self._randomTopic()
@@ -1138,7 +1136,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), 
long(6))])
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_direct_stream_transform_get_offsetRanges(self):
         """Test the Python direct Kafka stream transform get offsetRanges."""
         topic = self._randomTopic()
@@ -1176,7 +1173,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
         self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_direct_stream_transform_with_checkpoint(self):
         """Test the Python direct Kafka stream transform with checkpoint 
correctly recovered."""
         topic = self._randomTopic()
@@ -1225,7 +1221,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         finally:
             shutil.rmtree(tmpdir)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_rdd_message_handler(self):
         """Test Python direct Kafka RDD MessageHandler."""
         topic = self._randomTopic()
@@ -1242,7 +1237,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
                                    messageHandler=getKeyAndDoubleMessage)
         self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd)
 
-    @unittest.skipIf(sys.version >= "3", "long type not support")
     def test_kafka_direct_stream_message_handler(self):
         """Test the Python direct Kafka stream MessageHandler."""
         topic = self._randomTopic()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to