Queries with streaming sources must be executed with writeStream.start();;

2018-03-27 Thread Junfeng Chen
I am reading some data from kafka, and willing to save them to parquet on
hdfs with structured streaming.
The data from kafka is in JSON format. I try to convert them to
DataSet with spark.read.json(). However, I get the exception:
>
> Queries with streaming sources must be executed with
> writeStream.start()

Here is my code:
>
> Dataset df = spark.readStream().format("kafka")...
> Dataset jsonDataset = df.selectExpr("CAST(value AS STRING)").map...
> Dataset rowDataset = spark.read().json(jsonDataset);
>
> rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination();



How to solve it?

Thanks!

Regard,
Junfeng Chen


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-12 Thread kant kodali
I have about 100 fields in my dataset and some of them have "null" in it.
Does to_json fails to convert if that is the case?

Thanks!

On Tue, Sep 12, 2017 at 12:32 PM, kant kodali  wrote:

> Hi Michael,
>
> Interestingly that doesn't seem to quite work for me for some reason. Here
> is what I have
>
> Datset
>
> name | id | country
> -
> kant   | 1  | usa
> john   | 2  | usa
>
>
> And here is my code
>
> Dataset ds = getKafkaStream(); // This dataset represents the one above
> StreamingQuery query = 
> ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
> query.awaitTermination();
>
> *This works completely fine and I can see the rows on my console.*
>
> Now if I change it to this.
>
> Dataset ds = getKafkaStream(); // This dataset represents the one above
> Dataset jsonDS = 
> ds.select(to_json(struct(ds.col("*".as(Encoders.STRING());
> StreamingQuery query2 = 
> jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
> query2.awaitTermination();
>
> *I dont see any rows on my console and I made sure I waited for a while.*
>
> *The moment I change it back to above code and run it works again.*
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust 
> wrote:
>
>> The following will convert the whole row to JSON.
>>
>> import org.apache.spark.sql.functions.*
>> df.select(to_json(struct(col("*"
>>
>> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:
>>
>>> Thanks Ryan! In this case, I will have Dataset so is there a way to
>>> convert Row to Json string?
>>>
>>> Thanks
>>>
>>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 It's because "toJSON" doesn't support Structured Streaming. The current
 implementation will convert the Dataset to an RDD, which is not supported
 by streaming queries.

 On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:

> yes it is a streaming dataset. so what is the problem with following
> code?
>
> Dataset ds = dataset.toJSON().map(()->{some function that returns 
> a string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>
> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <
> felixcheun...@hotmail.com> wrote:
>
>> What is newDS?
>> If it is a Streaming Dataset/DataFrame (since you have writeStream
>> there) then there seems to be an issue preventing toJSON to work.
>>
>> --
>> *From:* kant kodali 
>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>> *To:* user @spark
>> *Subject:* Queries with streaming sources must be executed with
>> writeStream.start()
>>
>> Hi All,
>>
>> I  have the following code and I am not sure what's wrong with it? I
>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
>> 2.2.0 so I am wondering if there is any work around?
>>
>>  Dataset ds = newDS.toJSON().map(()->{some function that returns 
>> a string});
>>  StreamingQuery query = ds.writeStream().start();
>>  query.awaitTermination();
>>
>>
>

>>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-12 Thread kant kodali
Hi Michael,

Interestingly that doesn't seem to quite work for me for some reason. Here
is what I have

Datset

name | id | country
-
kant   | 1  | usa
john   | 2  | usa


And here is my code

Dataset ds = getKafkaStream(); // This dataset represents the one above
StreamingQuery query =
ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query.awaitTermination();

*This works completely fine and I can see the rows on my console.*

Now if I change it to this.

Dataset ds = getKafkaStream(); // This dataset represents the one above
Dataset jsonDS =
ds.select(to_json(struct(ds.col("*".as(Encoders.STRING());
StreamingQuery query2 =
jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query2.awaitTermination();

*I dont see any rows on my console and I made sure I waited for a while.*

*The moment I change it back to above code and run it works again.*











On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust 
wrote:

> The following will convert the whole row to JSON.
>
> import org.apache.spark.sql.functions.*
> df.select(to_json(struct(col("*"
>
> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:
>
>> Thanks Ryan! In this case, I will have Dataset so is there a way to
>> convert Row to Json string?
>>
>> Thanks
>>
>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> It's because "toJSON" doesn't support Structured Streaming. The current
>>> implementation will convert the Dataset to an RDD, which is not supported
>>> by streaming queries.
>>>
>>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:
>>>
 yes it is a streaming dataset. so what is the problem with following
 code?

 Dataset ds = dataset.toJSON().map(()->{some function that returns 
 a string});
  StreamingQuery query = ds.writeStream().start();
  query.awaitTermination();


 On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung  wrote:

> What is newDS?
> If it is a Streaming Dataset/DataFrame (since you have writeStream
> there) then there seems to be an issue preventing toJSON to work.
>
> --
> *From:* kant kodali 
> *Sent:* Saturday, September 9, 2017 4:04:33 PM
> *To:* user @spark
> *Subject:* Queries with streaming sources must be executed with
> writeStream.start()
>
> Hi All,
>
> I  have the following code and I am not sure what's wrong with it? I
> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
> 2.2.0 so I am wondering if there is any work around?
>
>  Dataset ds = newDS.toJSON().map(()->{some function that returns 
> a string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>

>>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-11 Thread Michael Armbrust
The following will convert the whole row to JSON.

import org.apache.spark.sql.functions.*
df.select(to_json(struct(col("*"

On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:

> Thanks Ryan! In this case, I will have Dataset so is there a way to
> convert Row to Json string?
>
> Thanks
>
> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> It's because "toJSON" doesn't support Structured Streaming. The current
>> implementation will convert the Dataset to an RDD, which is not supported
>> by streaming queries.
>>
>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:
>>
>>> yes it is a streaming dataset. so what is the problem with following
>>> code?
>>>
>>> Dataset ds = dataset.toJSON().map(()->{some function that returns a 
>>> string});
>>>  StreamingQuery query = ds.writeStream().start();
>>>  query.awaitTermination();
>>>
>>>
>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
>>> wrote:
>>>
 What is newDS?
 If it is a Streaming Dataset/DataFrame (since you have writeStream
 there) then there seems to be an issue preventing toJSON to work.

 --
 *From:* kant kodali 
 *Sent:* Saturday, September 9, 2017 4:04:33 PM
 *To:* user @spark
 *Subject:* Queries with streaming sources must be executed with
 writeStream.start()

 Hi All,

 I  have the following code and I am not sure what's wrong with it? I
 cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
 2.2.0 so I am wondering if there is any work around?

  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
 string});
  StreamingQuery query = ds.writeStream().start();
  query.awaitTermination();


>>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-09 Thread kant kodali
Thanks Ryan! In this case, I will have Dataset so is there a way to
convert Row to Json string?

Thanks

On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu 
wrote:

> It's because "toJSON" doesn't support Structured Streaming. The current
> implementation will convert the Dataset to an RDD, which is not supported
> by streaming queries.
>
> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:
>
>> yes it is a streaming dataset. so what is the problem with following code?
>>
>> Dataset ds = dataset.toJSON().map(()->{some function that returns a 
>> string});
>>  StreamingQuery query = ds.writeStream().start();
>>  query.awaitTermination();
>>
>>
>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
>> wrote:
>>
>>> What is newDS?
>>> If it is a Streaming Dataset/DataFrame (since you have writeStream
>>> there) then there seems to be an issue preventing toJSON to work.
>>>
>>> --
>>> *From:* kant kodali 
>>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>>> *To:* user @spark
>>> *Subject:* Queries with streaming sources must be executed with
>>> writeStream.start()
>>>
>>> Hi All,
>>>
>>> I  have the following code and I am not sure what's wrong with it? I
>>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
>>> 2.2.0 so I am wondering if there is any work around?
>>>
>>>  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
>>> string});
>>>  StreamingQuery query = ds.writeStream().start();
>>>  query.awaitTermination();
>>>
>>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-09 Thread Shixiong(Ryan) Zhu
It's because "toJSON" doesn't support Structured Streaming. The current
implementation will convert the Dataset to an RDD, which is not supported
by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:

> yes it is a streaming dataset. so what is the problem with following code?
>
> Dataset ds = dataset.toJSON().map(()->{some function that returns a 
> string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>
> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
> wrote:
>
>> What is newDS?
>> If it is a Streaming Dataset/DataFrame (since you have writeStream there)
>> then there seems to be an issue preventing toJSON to work.
>>
>> --
>> *From:* kant kodali 
>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>> *To:* user @spark
>> *Subject:* Queries with streaming sources must be executed with
>> writeStream.start()
>>
>> Hi All,
>>
>> I  have the following code and I am not sure what's wrong with it? I
>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
>> 2.2.0 so I am wondering if there is any work around?
>>
>>  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
>> string});
>>  StreamingQuery query = ds.writeStream().start();
>>  query.awaitTermination();
>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-09 Thread kant kodali
yes it is a streaming dataset. so what is the problem with following code?

Dataset ds = dataset.toJSON().map(()->{some function that
returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();


On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
wrote:

> What is newDS?
> If it is a Streaming Dataset/DataFrame (since you have writeStream there)
> then there seems to be an issue preventing toJSON to work.
>
> --
> *From:* kant kodali 
> *Sent:* Saturday, September 9, 2017 4:04:33 PM
> *To:* user @spark
> *Subject:* Queries with streaming sources must be executed with
> writeStream.start()
>
> Hi All,
>
> I  have the following code and I am not sure what's wrong with it? I
> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
> 2.2.0 so I am wondering if there is any work around?
>
>  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
> string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-09 Thread Felix Cheung
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then 
there seems to be an issue preventing toJSON to work.


From: kant kodali 
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with 
writeStream.start()

Hi All,

I  have the following code and I am not sure what's wrong with it? I cannot 
call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I 
am wondering if there is any work around?


 Dataset ds = newDS.toJSON().map(()->{some function that returns a 
string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();


Queries with streaming sources must be executed with writeStream.start()

2017-09-09 Thread kant kodali
Hi All,

I  have the following code and I am not sure what's wrong with it? I cannot
call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so
I am wondering if there is any work around?

 Dataset ds = newDS.toJSON().map(()->{some function that
returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();