Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2f5eaa9f8 -> 02cf178bb


[SPARK-19185][DSTREAM] Make Kafka consumer cache configurable

## What changes were proposed in this pull request?

Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows 
users to enable or disable the cache for Kafka consumers. This property can be 
especially handy in cases where issues like SPARK-19185 get hit, for which 
there isn't a solution committed yet. By default, the cache is still on, so 
this change doesn't change any out-of-box behavior.

## How was this patch tested?
Running unit tests

Author: Mark Grover <m...@apache.org>
Author: Mark Grover <grover.markgro...@gmail.com>

Closes #18234 from markgrover/spark-19185.

(cherry picked from commit 55b8cfe6e6a6759d65bf219ff570fd6154197ec4)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02cf178b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02cf178b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02cf178b

Branch: refs/heads/branch-2.2
Commit: 02cf178bb2a7dc8b4c06eb040c44b6453e41ed15
Parents: 2f5eaa9
Author: Mark Grover <m...@apache.org>
Authored: Thu Jun 8 09:55:43 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Jun 8 09:55:54 2017 -0700

----------------------------------------------------------------------
 docs/streaming-kafka-0-10-integration.md                     | 4 +++-
 .../spark/streaming/kafka010/DirectKafkaInputDStream.scala   | 8 +++++---
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02cf178b/docs/streaming-kafka-0-10-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-10-integration.md 
b/docs/streaming-kafka-0-10-integration.md
index 92c296a..386066a 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into 
buffers.  Therefore it i
 
 In most cases, you should use `LocationStrategies.PreferConsistent` as shown 
above.  This will distribute partitions evenly across available executors.  If 
your executors are on the same hosts as your Kafka brokers, use 
`PreferBrokers`, which will prefer to schedule partitions on the Kafka leader 
for that partition.  Finally, if you have a significant skew in load among 
partitions, use `PreferFixed`. This allows you to specify an explicit mapping 
of partitions to hosts (any unspecified partitions will use a consistent 
location).
 
-The cache for consumers has a default maximum size of 64.  If you expect to be 
handling more than (64 * number of executors) Kafka partitions, you can change 
this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`
+The cache for consumers has a default maximum size of 64.  If you expect to be 
handling more than (64 * number of executors) Kafka partitions, you can change 
this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`.
+
+If you would like to disable the caching for Kafka consumers, you can set 
`spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache 
may be needed to workaround the problem described in SPARK-19185. This property 
may be removed in later versions of Spark, once SPARK-19185 is resolved.
 
 The cache is keyed by topicpartition and group.id, so use a **separate** 
`group.id` for each call to `createDirectStream`.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/02cf178b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 6d6983c..9a4a1cf 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
       val fo = currentOffsets(tp)
       OffsetRange(tp.topic, tp.partition, fo, uo)
     }
-    val rdd = new KafkaRDD[K, V](
-      context.sparkContext, executorKafkaParams, offsetRanges.toArray, 
getPreferredHosts, true)
+    val useConsumerCache = 
context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
+      true)
+    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, 
offsetRanges.toArray,
+      getPreferredHosts, useConsumerCache)
 
     // Report the record number and metadata of this batch interval to 
InputInfoTracker.
     val description = offsetRanges.filter { offsetRange =>
@@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
            b.map(OffsetRange(_)),
            getPreferredHosts,
            // during restore, it's possible same partition will be consumed 
from multiple
-           // threads, so dont use cache
+           // threads, so do not use cache.
            false
          )
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to