Queries with streaming sources must be executed with writeStream.start();;
I am reading some data from kafka, and willing to save them to parquet on hdfs with structured streaming. The data from kafka is in JSON format. I try to convert them to DataSet with spark.read.json(). However, I get the exception: > > Queries with streaming sources must be executed with > writeStream.start() Here is my code: > > Dataset df = spark.readStream().format("kafka")... > Dataset jsonDataset = df.selectExpr("CAST(value AS STRING)").map... > Dataset rowDataset = spark.read().json(jsonDataset); > > rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination(); How to solve it? Thanks! Regard, Junfeng Chen
Re: Queries with streaming sources must be executed with writeStream.start()
I have about 100 fields in my dataset and some of them have "null" in it. Does to_json fails to convert if that is the case? Thanks! On Tue, Sep 12, 2017 at 12:32 PM, kant kodaliwrote: > Hi Michael, > > Interestingly that doesn't seem to quite work for me for some reason. Here > is what I have > > Datset > > name | id | country > - > kant | 1 | usa > john | 2 | usa > > > And here is my code > > Dataset ds = getKafkaStream(); // This dataset represents the one above > StreamingQuery query = > ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); > query.awaitTermination(); > > *This works completely fine and I can see the rows on my console.* > > Now if I change it to this. > > Dataset ds = getKafkaStream(); // This dataset represents the one above > Dataset jsonDS = > ds.select(to_json(struct(ds.col("*".as(Encoders.STRING()); > StreamingQuery query2 = > jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); > query2.awaitTermination(); > > *I dont see any rows on my console and I made sure I waited for a while.* > > *The moment I change it back to above code and run it works again.* > > > > > > > > > > > > On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust > wrote: > >> The following will convert the whole row to JSON. >> >> import org.apache.spark.sql.functions.* >> df.select(to_json(struct(col("*" >> >> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali wrote: >> >>> Thanks Ryan! In this case, I will have Dataset so is there a way to >>> convert Row to Json string? >>> >>> Thanks >>> >>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries. On Sat, Sep 9, 2017 at 4:40 PM, kant kodali wrote: > yes it is a streaming dataset. so what is the problem with following > code? > > Dataset ds = dataset.toJSON().map(()->{some function that returns > a string}); > StreamingQuery query = ds.writeStream().start(); > query.awaitTermination(); > > > On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung < > felixcheun...@hotmail.com> wrote: > >> What is newDS? >> If it is a Streaming Dataset/DataFrame (since you have writeStream >> there) then there seems to be an issue preventing toJSON to work. >> >> -- >> *From:* kant kodali >> *Sent:* Saturday, September 9, 2017 4:04:33 PM >> *To:* user @spark >> *Subject:* Queries with streaming sources must be executed with >> writeStream.start() >> >> Hi All, >> >> I have the following code and I am not sure what's wrong with it? I >> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark >> 2.2.0 so I am wondering if there is any work around? >> >> Dataset ds = newDS.toJSON().map(()->{some function that returns >> a string}); >> StreamingQuery query = ds.writeStream().start(); >> query.awaitTermination(); >> >> > >>> >> >
Re: Queries with streaming sources must be executed with writeStream.start()
Hi Michael, Interestingly that doesn't seem to quite work for me for some reason. Here is what I have Datset name | id | country - kant | 1 | usa john | 2 | usa And here is my code Dataset ds = getKafkaStream(); // This dataset represents the one above StreamingQuery query = ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); query.awaitTermination(); *This works completely fine and I can see the rows on my console.* Now if I change it to this. Dataset ds = getKafkaStream(); // This dataset represents the one above Dataset jsonDS = ds.select(to_json(struct(ds.col("*".as(Encoders.STRING()); StreamingQuery query2 = jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); query2.awaitTermination(); *I dont see any rows on my console and I made sure I waited for a while.* *The moment I change it back to above code and run it works again.* On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrustwrote: > The following will convert the whole row to JSON. > > import org.apache.spark.sql.functions.* > df.select(to_json(struct(col("*" > > On Sat, Sep 9, 2017 at 6:27 PM, kant kodali wrote: > >> Thanks Ryan! In this case, I will have Dataset so is there a way to >> convert Row to Json string? >> >> Thanks >> >> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> It's because "toJSON" doesn't support Structured Streaming. The current >>> implementation will convert the Dataset to an RDD, which is not supported >>> by streaming queries. >>> >>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali wrote: >>> yes it is a streaming dataset. so what is the problem with following code? Dataset ds = dataset.toJSON().map(()->{some function that returns a string}); StreamingQuery query = ds.writeStream().start(); query.awaitTermination(); On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung wrote: > What is newDS? > If it is a Streaming Dataset/DataFrame (since you have writeStream > there) then there seems to be an issue preventing toJSON to work. > > -- > *From:* kant kodali > *Sent:* Saturday, September 9, 2017 4:04:33 PM > *To:* user @spark > *Subject:* Queries with streaming sources must be executed with > writeStream.start() > > Hi All, > > I have the following code and I am not sure what's wrong with it? I > cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark > 2.2.0 so I am wondering if there is any work around? > > Dataset ds = newDS.toJSON().map(()->{some function that returns > a string}); > StreamingQuery query = ds.writeStream().start(); > query.awaitTermination(); > > >>> >> >
Re: Queries with streaming sources must be executed with writeStream.start()
The following will convert the whole row to JSON. import org.apache.spark.sql.functions.* df.select(to_json(struct(col("*" On Sat, Sep 9, 2017 at 6:27 PM, kant kodaliwrote: > Thanks Ryan! In this case, I will have Dataset so is there a way to > convert Row to Json string? > > Thanks > > On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> It's because "toJSON" doesn't support Structured Streaming. The current >> implementation will convert the Dataset to an RDD, which is not supported >> by streaming queries. >> >> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali wrote: >> >>> yes it is a streaming dataset. so what is the problem with following >>> code? >>> >>> Dataset ds = dataset.toJSON().map(()->{some function that returns a >>> string}); >>> StreamingQuery query = ds.writeStream().start(); >>> query.awaitTermination(); >>> >>> >>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung >>> wrote: >>> What is newDS? If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work. -- *From:* kant kodali *Sent:* Saturday, September 9, 2017 4:04:33 PM *To:* user @spark *Subject:* Queries with streaming sources must be executed with writeStream.start() Hi All, I have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around? Dataset ds = newDS.toJSON().map(()->{some function that returns a string}); StreamingQuery query = ds.writeStream().start(); query.awaitTermination(); >>> >> >
Re: Queries with streaming sources must be executed with writeStream.start()
Thanks Ryan! In this case, I will have Dataset so is there a way to convert Row to Json string? Thanks On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhuwrote: > It's because "toJSON" doesn't support Structured Streaming. The current > implementation will convert the Dataset to an RDD, which is not supported > by streaming queries. > > On Sat, Sep 9, 2017 at 4:40 PM, kant kodali wrote: > >> yes it is a streaming dataset. so what is the problem with following code? >> >> Dataset ds = dataset.toJSON().map(()->{some function that returns a >> string}); >> StreamingQuery query = ds.writeStream().start(); >> query.awaitTermination(); >> >> >> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung >> wrote: >> >>> What is newDS? >>> If it is a Streaming Dataset/DataFrame (since you have writeStream >>> there) then there seems to be an issue preventing toJSON to work. >>> >>> -- >>> *From:* kant kodali >>> *Sent:* Saturday, September 9, 2017 4:04:33 PM >>> *To:* user @spark >>> *Subject:* Queries with streaming sources must be executed with >>> writeStream.start() >>> >>> Hi All, >>> >>> I have the following code and I am not sure what's wrong with it? I >>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark >>> 2.2.0 so I am wondering if there is any work around? >>> >>> Dataset ds = newDS.toJSON().map(()->{some function that returns a >>> string}); >>> StreamingQuery query = ds.writeStream().start(); >>> query.awaitTermination(); >>> >>> >> >
Re: Queries with streaming sources must be executed with writeStream.start()
It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries. On Sat, Sep 9, 2017 at 4:40 PM, kant kodaliwrote: > yes it is a streaming dataset. so what is the problem with following code? > > Dataset ds = dataset.toJSON().map(()->{some function that returns a > string}); > StreamingQuery query = ds.writeStream().start(); > query.awaitTermination(); > > > On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung > wrote: > >> What is newDS? >> If it is a Streaming Dataset/DataFrame (since you have writeStream there) >> then there seems to be an issue preventing toJSON to work. >> >> -- >> *From:* kant kodali >> *Sent:* Saturday, September 9, 2017 4:04:33 PM >> *To:* user @spark >> *Subject:* Queries with streaming sources must be executed with >> writeStream.start() >> >> Hi All, >> >> I have the following code and I am not sure what's wrong with it? I >> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark >> 2.2.0 so I am wondering if there is any work around? >> >> Dataset ds = newDS.toJSON().map(()->{some function that returns a >> string}); >> StreamingQuery query = ds.writeStream().start(); >> query.awaitTermination(); >> >> >
Re: Queries with streaming sources must be executed with writeStream.start()
yes it is a streaming dataset. so what is the problem with following code? Dataset ds = dataset.toJSON().map(()->{some function that returns a string}); StreamingQuery query = ds.writeStream().start(); query.awaitTermination(); On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheungwrote: > What is newDS? > If it is a Streaming Dataset/DataFrame (since you have writeStream there) > then there seems to be an issue preventing toJSON to work. > > -- > *From:* kant kodali > *Sent:* Saturday, September 9, 2017 4:04:33 PM > *To:* user @spark > *Subject:* Queries with streaming sources must be executed with > writeStream.start() > > Hi All, > > I have the following code and I am not sure what's wrong with it? I > cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark > 2.2.0 so I am wondering if there is any work around? > > Dataset ds = newDS.toJSON().map(()->{some function that returns a > string}); > StreamingQuery query = ds.writeStream().start(); > query.awaitTermination(); > >
Re: Queries with streaming sources must be executed with writeStream.start()
What is newDS? If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work. From: kant kodaliSent: Saturday, September 9, 2017 4:04:33 PM To: user @spark Subject: Queries with streaming sources must be executed with writeStream.start() Hi All, I have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around? Dataset ds = newDS.toJSON().map(()->{some function that returns a string}); StreamingQuery query = ds.writeStream().start(); query.awaitTermination();
Queries with streaming sources must be executed with writeStream.start()
Hi All, I have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around? Dataset ds = newDS.toJSON().map(()->{some function that returns a string}); StreamingQuery query = ds.writeStream().start(); query.awaitTermination();