You should not need to include jars for Kafka, the spark connectors
have the appropriate transitive dependency on the correct version.

On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mmistr...@gmail.com> wrote:
> Hi
>  not sure if this will help at all, and pls take it with a pinch of salt as
> i dont have your setup and i am not running on a cluster
>
>  I have tried to run a kafka example which was originally workkign on spark
> 1.6.1 on spark 2.
> These are the jars i am using
>
> spark-streaming-kafka-0-10_2.11_2.0.1.jar
>
> kafka_2.11-0.10.1.1
>
>
> And here's the code up to the creation of the Direct Stream. apparently with
> the new version of kafka libs some properties have to be specified
>
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.storage.StorageLevel
>
> import java.util.regex.Pattern
> import java.util.regex.Matcher
>
> import Utilities._
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import kafka.serializer.StringDecoder
> import
> org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
> /** Working example of listening for log data from Kafka's testLogs topic on
> port 9092. */
> object KafkaExample {
>
>   def main(args: Array[String]) {
>
>     // Create the context with a 1 second batch size
>     val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))
>
>     setupLogging()
>
>     // Construct a regular expression (regex) to extract fields from raw
> Apache log lines
>     val pattern = apacheLogPattern()
>
>     val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
> "bootstrap.servers" -> "localhost:9092",
>             "key.deserializer"
> ->"org.apache.kafka.common.serialization.StringDeserializer",
>             "value.deserializer"
> ->"org.apache.kafka.common.serialization.StringDeserializer",
>             "group.id" -> "group1")
>     val topics = List("testLogs").toSet
>     val lines = KafkaUtils.createDirectStream[String, String](
>                                             ssc,
>                                             PreferConsistent,
>                                             Subscribe[String,
> String](topics, kafkaParams)
>                                           ).map(cr => cr.value())
>
> hth
>
>  marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>>
>> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
>> 1.5).
>>
>> Admittedly I am messing around with Spark-shell. However, I am surprised
>> why this does not work with Spark 2 and is ok with CDH 5.1
>>
>> scala>     val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$
>>   at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:345)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>   at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>   at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
>>   at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>   ... 74 elided
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>> loss, damage or destruction of data or any other property which may arise
>> from relying on this email's technical content is explicitly disclaimed. The
>> author will in no case be liable for any monetary damages arising from such
>> loss, damage or destruction.
>>
>>
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to