[
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15270203#comment-15270203
]
Mario Briggs commented on SPARK-15089:
--------------------------------------
Kafka supports SSL only with the 0.9x Kakfa client API's. The Spark-Kafka
connector you are exercising (KafkaUtils.createDirectStream) uses the 0.8 Kafka
API's that does not support SSL. Your standalone kafka consumer program is
using the 0.9 kafka client.
If you are willing to play with the edge, then this PR has the code that uses
the kafka 0.9 client API - https://github.com/apache/spark/pull/11863 and the
JIRA is ttps://issues.apache.org/jira/browse/SPARK-12177
> kafka-spark consumer with SSL problem
> -------------------------------------
>
> Key: SPARK-15089
> URL: https://issues.apache.org/jira/browse/SPARK-15089
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.6.1
> Reporter: JasonChang
>
> I am not sure spark streaming support SSL
> I tried to add params to kafkaParams, but it not work
> {code}
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new
> Duration(10000));
> Set<String> topicmap = new HashSet<String>();
> topicmap.add(kafkaTopic);
> Map<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url);
> kafkaParams.put("security.protocol", "SSL");
> kafkaParams.put("ssl.keystore.type", "JKS");
> kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks");
> kafkaParams.put("ssl.keystore.password ", "password");
> kafkaParams.put("ssl.key.password", "password");
> kafkaParams.put("ssl.truststore.type", "JKS");
> kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks");
> kafkaParams.put("ssl.truststore.password", "password");
> kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);
> JavaPairInputDStream<String, String> stream =
> KafkaUtils.createDirectStream(jsc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicmap
> );
> JavaDStream<String> lines = stream.map(new Function<Tuple2<String, String>,
> String>() {
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
> });
> {code}
> {code}
> Exception in thread "main" org.apache.spark.SparkException:
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
> at
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]