Encoders can only map data into an object if those columns already exist. When we are reading from Kafka, we just get a binary blob and you'll need to help Spark parse that first. Assuming your data is stored in JSON it should be pretty straight forward.
streams = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .withColumn("message", from_json(col("value").cast("string"), tweetSchema)) // cast the binary value to a string and parse it as json .select("message.*") // unnest the json .as(Encoders.bean(Tweet.class)) // only required if you want to use lambda functions on the data using this class Here is some more info on working with JSON and other semi-structured formats <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> . On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> wrote: > Hi, > > Currently , encountering the following exception while working with > below-mentioned code snippet : > > > Please suggest the correct approach for reading the stream into a sql > > schema. > > If I add 'tweetSchema' while reading stream, it errors out with message - > > we can not change static schema for kafka. > > ------------------------------------------------------------ > ------------------------------- > > *exception* > > Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve > '`location`' given input columns: [topic, timestamp, key, offset, value, > timestampType, partition]*; > at > org.apache.spark.sql.catalyst.analysis.package$ > AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse( > CheckAnalysis.scala:77) > ------------------------------------------------------------ > -------------------------------------------- > > *structured streaming code snippet* > > String bootstrapServers = "localhost:9092"; > String subscribeType = "subscribe"; > String topics = "events"; > > StructType tweetSchema = new StructType() > .add("tweetId", "string") > .add("tweetText", "string") > .add("location", "string") > .add("timestamp", "string"); > > SparkSession spark = SparkSession > .builder() > .appName("StreamProcessor") > .config("spark.master", "local") > .getOrCreate(); > > Dataset<Tweet> streams = spark > .readStream() > .format("kafka") > .option("kafka.bootstrap.servers", > bootstrapServers) > .option(subscribeType, topics) > .load() > .as(Encoders.bean(Tweet.class)); > > streams.createOrReplaceTempView("streamsData"); > > String sql = "SELECT location, COUNT(*) as count FROM > streamsData > GROUP BY location"; > Dataset<Row> countsByLocation = spark.sql(sql); > > StreamingQuery query = countsByLocation.writeStream() > .outputMode("complete") > .format("console") > .start(); > > query.awaitTermination(); > ------------------------------------------------------------ > -------------------------------------- > > *Tweet * > > Tweet.java - has public constructor and getter / setter methods > > public class Tweet implements Serializable{ > > private String tweetId; > private String tweetText; > private String location; > private String timestamp; > > public Tweet(){ > > } > ............. > > ------------------------------------------------------------ > ---------------------------- > > *pom.xml * > > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-core_2.10</artifactId> > <version>2.1.0</version> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming_2.10</artifactId> > <version>2.1.0</version> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming- > kafka-0-8_2.10</artifactId> > <version>2.1.0</version> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql_2.10</artifactId> > <version>2.1.0</version> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql-kafka-0-10_2.10</artifactId> > <version>2.1.0</version> > </dependency> > ------------------------------------------------------------ > ------------------------ > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >