[ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15270203#comment-15270203
 ] 

Mario Briggs commented on SPARK-15089:
--------------------------------------

Kafka supports SSL only with the 0.9x Kakfa client API's. The Spark-Kafka 
connector you are exercising (KafkaUtils.createDirectStream) uses the 0.8 Kafka 
API's that does not support SSL. Your standalone kafka consumer program is 
using the 0.9 kafka client.

If you are willing to play with the edge, then this PR has the code that uses 
the kafka 0.9 client API - https://github.com/apache/spark/pull/11863 and the 
JIRA is ttps://issues.apache.org/jira/browse/SPARK-12177

> kafka-spark consumer with SSL problem
> -------------------------------------
>
>                 Key: SPARK-15089
>                 URL: https://issues.apache.org/jira/browse/SPARK-15089
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.1
>            Reporter: JasonChang
>
> I am not sure spark streaming support SSL
> I tried to add params to kafkaParams, but it not work
> {code}
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
> Duration(10000));
> Set<String> topicmap = new HashSet<String>();
> topicmap.add(kafkaTopic);
> Map<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url);
> kafkaParams.put("security.protocol", "SSL");
> kafkaParams.put("ssl.keystore.type", "JKS");
> kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks");
> kafkaParams.put("ssl.keystore.password ", "password");
> kafkaParams.put("ssl.key.password", "password");
> kafkaParams.put("ssl.truststore.type", "JKS");
> kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks");
> kafkaParams.put("ssl.truststore.password", "password");
> kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);
> JavaPairInputDStream<String, String> stream = 
> KafkaUtils.createDirectStream(jsc,
>               String.class,
>               String.class,
>               StringDecoder.class,
>               StringDecoder.class,
>               kafkaParams,
>               topicmap
> );
> JavaDStream<String> lines = stream.map(new Function<Tuple2<String, String>, 
> String>() {
>       public String call(Tuple2<String, String> tuple2) {
>                       return tuple2._2();
>               }
> });
> {code}
> {code}
> Exception in thread "main" org.apache.spark.SparkException: 
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>       at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>       at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>       at scala.util.Either.fold(Either.scala:97)
>       at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to