Hey Chris, You got it right. I'm reading a *csv *file from local as mentioned above, with a console producer on Kafka side.
So, as it is a csv data with headers, shall I then use from_csv on the spark side and provide a StructType to shape it up with a schema and then cast it to string as TD suggested? I'm getting all of your points at a very high level. A little more granularity would help. *In the slide TD just shared*, PFA, I'm confused at the point where he is casting the value as string. Logically, the value shall consist of all the entire data set, so, suppose, I've a table with many columns, *how can I provide a single alias as he did in the groupBy. I missed it there itself. Another question is, do I have to cast in groupBy itself? Can't I do it directly in a select query? The last one, if the steps are followed, can I then run a SQL query on top of the columns separately?* Thanks, Aakash. On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com> wrote: You need to tell Spark about the structure of the data, it doesn't know ahead of time if you put avro, json, protobuf, etc. in kafka for the message format. If the messages are in json, Spark provides from_json out of the box. For a very simple POC you can happily cast the value to a string, etc. if you are prototyping and pushing messages by hand with a console producer on the kafka side. ________________________________________ From: Aakash Basu <aakash.spark....@gmail.com> Sent: Thursday, March 15, 2018 7:52:28 AM To: Tathagata Das Cc: Dylan Guedes; Georg Heiler; user Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query Hi, And if I run this below piece of code - from pyspark.sql import SparkSession import time class test: spark = SparkSession.builder \ .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate() # ssc = StreamingContext(spark, 20) table1_stream = (spark.readStream.format("kafka").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()) table2_stream = ( spark.readStream.format("kafka").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test2").load()) joined_Stream = table1_stream.join(table2_stream, "Id") # # joined_Stream.show() # query = table1_stream.writeStream.format("console").start().awaitTermination() # .queryName("table_A").format("memory") # spark.sql("select * from table_A").show() time.sleep(10) # sleep 20 seconds # query.stop() # query # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Stream_Stream_Join.py I get the below error (in Spark 2.3.0) - Traceback (most recent call last): File "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", line 4, in <module> class test: File "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", line 19, in test joined_Stream = table1_stream.join(table2_stream, "Id") File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/ lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/ lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/ lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be resolved on the left side of the join. The left-side columns: [key, value, topic, partition, offset, timestamp, timestampType];' Seems, as per the documentation, they key and value are deserialized as byte arrays. I am badly stuck at this step, not many materials online, with steps to proceed on this, too. Any help, guys? Thanks, Aakash. On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com< mailto:aakash.spark....@gmail.com>> wrote: Any help on the above? On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com< mailto:aakash.spark....@gmail.com>> wrote: Hi, I progressed a bit in the above mentioned topic - 1) I am feeding a CSV file into the Kafka topic. 2) Feeding the Kafka topic as readStream as TD's article suggests. 3) Then, simply trying to do a show on the streaming dataframe, using queryName('XYZ') in the writeStream and writing a sql query on top of it, but that doesn't show anything. 4) Once all the above problems are resolved, I want to perform a stream-stream join. The CSV file I'm ingesting into Kafka has - id,first_name,last_name 1,Kellyann,Moyne 2,Morty,Blacker 3,Tobit,Robardley 4,Wilona,Kells 5,Reggy,Comizzoli My test code - from pyspark.sql import SparkSession import time class test: spark = SparkSession.builder \ .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate() # ssc = StreamingContext(spark, 20) table1_stream = (spark.readStream.format("kafka").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()) # table2_stream = (spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test2").load()) # joined_Stream = table1_stream.join(table2_stream, "Id") # # joined_Stream.show() query = table1_stream.writeStream.format("console").queryName("table_A").start() # .format("memory") # spark.sql("select * from table_A").show() # time.sleep(10) # sleep 20 seconds # query.stop() query.awaitTermination() # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Stream_Stream_Join.py The output I'm getting (whereas I simply want to show() my dataframe) - +----+--------------------+-----+---------+------+---------- ----------+-------------+ | key| value|topic|partition|offset| timestamp|timestampType| +----+--------------------+-----+---------+------+---------- ----------+-------------+ |null|[69 64 2C 66 69 7...|test1| 0| 5226|2018-03-15 15:48:...| 0| |null|[31 2C 4B 65 6C 6...|test1| 0| 5227|2018-03-15 15:48:...| 0| |null|[32 2C 4D 6F 72 7...|test1| 0| 5228|2018-03-15 15:48:...| 0| |null|[33 2C 54 6F 62 6...|test1| 0| 5229|2018-03-15 15:48:...| 0| |null|[34 2C 57 69 6C 6...|test1| 0| 5230|2018-03-15 15:48:...| 0| |null|[35 2C 52 65 67 6...|test1| 0| 5231|2018-03-15 15:48:...| 0| +----+--------------------+-----+---------+------+---------- ----------+-------------+ 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: { "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8", "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21", "name" : "table_A", "timestamp" : "2018-03-15T10:18:07.218Z", "numInputRows" : 6, "inputRowsPerSecond" : 461.53846153846155, "processedRowsPerSecond" : 14.634146341463415, "durationMs" : { "addBatch" : 241, "getBatch" : 15, "getOffset" : 2, "queryPlanning" : 2, "triggerExecution" : 410, "walCommit" : 135 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[test1]]", "startOffset" : { "test1" : { "0" : 5226 } }, "endOffset" : { "test1" : { "0" : 5232 } }, "numInputRows" : 6, "inputRowsPerSecond" : 461.53846153846155, "processedRowsPerSecond" : 14.634146341463415 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming. ConsoleSink@3dfc7990" } } P.S - If I add the below piece in the code, it doesn't print a DF of the actual table. spark.sql("select * from table_A").show() Any help? Thanks, Aakash. On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com< mailto:aakash.spark....@gmail.com>> wrote: Thanks to TD, the savior! Shall look into it. On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <tathagata.das1...@gmail.com< mailto:tathagata.das1...@gmail.com>> wrote: Relevant: https://databricks.com/blog/2018/03/13/introducing-stream- stream-joins-in-apache-spark-2-3.html This is true stream-stream join which will automatically buffer delayed data and appropriately join stuff with SQL join semantics. Please check it out :) TD On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com<mailto: djmggue...@gmail.com>> wrote: I misread it, and thought that you question was if pyspark supports kafka lol. Sorry! On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com< mailto:aakash.spark....@gmail.com>> wrote: Hey Dylan, Great! Can you revert back to my initial and also the latest mail? Thanks, Aakash. On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com<mailto:d jmggue...@gmail.com>> wrote: Hi, I've been using the Kafka with pyspark since 2.1. On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark....@gmail.com< mailto:aakash.spark....@gmail.com>> wrote: Hi, I'm yet to. Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package allows Python? I read somewhere, as of now Scala and Java are the languages to be used. Please correct me if am wrong. Thanks, Aakash. On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com<mailto: georg.kf.hei...@gmail.com>> wrote: Did you try spark 2.3 with structured streaming? There watermarking and plain sql might be really interesting for you. Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.com>> schrieb am Mi. 14. März 2018 um 14:57: Hi, Info (Using): Spark Streaming Kafka 0.8 package Spark 2.2.1 Kafka 1.0.1 As of now, I am feeding paragraphs in Kafka console producer and my Spark, which is acting as a receiver is printing the flattened words, which is a complete RDD operation. My motive is to read two tables continuously (being updated) as two distinct Kafka topics being read as two Spark Dataframes and join them based on a key and produce the output. (I am from Spark-SQL background, pardon my Spark-SQL-ish writing) It may happen, the first topic is receiving new data 15 mins prior to the second topic, in that scenario, how to proceed? I should not lose any data. As of now, I want to simply pass paragraphs, read them as RDD, convert to DF and then join to get the common keys as the output. (Just for R&D). Started using Spark Streaming and Kafka today itself. Please help! Thanks, Aakash.