What command did you use to launch your Spark application? The https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying documentation suggests using spark-submit with the `--packages` flag to include the required Kafka package. e.g.
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ... On Mon, Nov 20, 2017 at 3:07 PM, salemi <alireza.sal...@udo.edu> wrote: > Hi All, > > we are trying to use DataFrames approach with Kafka 0.10 and PySpark 2.2.0. > We followed the instruction on the wiki > https://spark.apache.org/docs/latest/structured-streaming- > kafka-integration.html. > We coded something similar to the code below using Python: > df = spark \ > .read \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ > .option("subscribe", "topic1") \ > .load() > > But we are getting the the the exception below. Does PySpark 2.2.0 supports > DataFrames with Kafka 0.10? If yes, what could be the root cause for the > exception below? > > Thank you, > Ali > > Exception: > py4j.protocol.Py4JJavaError: An error occurred while calling o31.load. > : java.lang.ClassNotFoundException: Failed to find data source: kafka. > Please find packages at http://spark.apache.org/third-party-projects.html > at > org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource( > DataSource.scala:549) > at > org.apache.spark.sql.execution.datasources.DataSource.providingClass$ > lzycompute(DataSource.scala:86) > at > org.apache.spark.sql.execution.datasources.DataSource.providingClass( > DataSource.scala:86) > at > org.apache.spark.sql.execution.datasources.DataSource.sourceSchema( > DataSource.scala:195) > at > org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$ > lzycompute(DataSource.scala:87) > at > org.apache.spark.sql.execution.datasources.DataSource.sourceInfo( > DataSource.scala:87) > at > org.apache.spark.sql.execution.streaming.StreamingRelation$.apply( > StreamingRelation.scala:30) > at > org.apache.spark.sql.streaming.DataStreamReader. > load(DataStreamReader.scala:150) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: > 62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke( > ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand. > java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$ > anonfun$apply$12.apply(DataSource.scala:533) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$ > anonfun$apply$12.apply(DataSource.scala:533) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply( > DataSource.scala:533) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply( > DataSource.scala:533) > at scala.util.Try.orElse(Try.scala:84) > at > org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource( > DataSource.scala:533) > ... 18 more > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Twitter: https://twitter.com/holdenkarau