Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first.  Assuming your data is stored in JSON it
should be pretty straight forward.

streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class)) // only required if you want to use
lambda functions on the data using this class

Here is some more info on working with JSON and other semi-structured
formats
<https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
.

On Fri, Mar 24, 2017 at 10:49 AM, kaniska <kaniska.man...@gmail.com> wrote:

> Hi,
>
> Currently , encountering the following exception while working with
> below-mentioned code snippet :
>
> > Please suggest the correct approach for reading the stream into a sql
> > schema.
> > If I add 'tweetSchema' while reading stream, it errors out with message -
> > we can not change static schema for kafka.
>
> ------------------------------------------------------------
> -------------------------------
>
> *exception*
>
> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
> '`location`' given input columns: [topic, timestamp, key, offset, value,
> timestampType, partition]*;
>         at
> org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:77)
> ------------------------------------------------------------
> --------------------------------------------
>
> *structured streaming code snippet*
>
> String bootstrapServers = "localhost:9092";
>             String subscribeType = "subscribe";
>             String topics = "events";
>
>             StructType tweetSchema = new StructType()
>                 .add("tweetId", "string")
>                 .add("tweetText", "string")
>                 .add("location", "string")
>                 .add("timestamp", "string");
>
>            SparkSession spark = SparkSession
>                               .builder()
>                               .appName("StreamProcessor")
>                               .config("spark.master", "local")
>                               .getOrCreate();
>
>           Dataset<Tweet> streams = spark
>                                       .readStream()
>                                       .format("kafka")
>                                       .option("kafka.bootstrap.servers",
> bootstrapServers)
>                                       .option(subscribeType, topics)
>                                       .load()
>                                       .as(Encoders.bean(Tweet.class));
>
>          streams.createOrReplaceTempView("streamsData");
>
>                    String sql = "SELECT location,  COUNT(*) as count FROM
> streamsData
> GROUP BY location";
>                    Dataset<Row> countsByLocation = spark.sql(sql);
>
>                     StreamingQuery query = countsByLocation.writeStream()
>                       .outputMode("complete")
>                       .format("console")
>                       .start();
>
>                     query.awaitTermination();
> ------------------------------------------------------------
> --------------------------------------
>
> *Tweet *
>
> Tweet.java - has public constructor and getter / setter methods
>
> public class Tweet implements Serializable{
>
>         private String tweetId;
>         private String tweetText;
>         private String location;
>         private String timestamp;
>
>         public Tweet(){
>
>         }
> .............
>
> ------------------------------------------------------------
> ----------------------------
>
> *pom.xml *
>
>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-core_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-streaming_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-streaming-
> kafka-0-8_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-sql_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                 <groupId>org.apache.spark</groupId>
>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>                 <version>2.1.0</version>
>                 </dependency>
> ------------------------------------------------------------
> ------------------------
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to