And also is query.stop() is graceful stop operation?what happens to already
received data will it be processed ?

On Tue, Aug 15, 2017 at 7:21 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> Ok thanks
>
> Few more
>
> 1.when I looked into the documentation it says onQueryprogress is not
> threadsafe ,So Is this method would be the right place to refresh cache?and
> no need to restart query if I choose listener ?
>
> The methods are not thread-safe as they may be called from different
> threads.
>
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
>
>
>
> 2.if I use streamingquerylistner onqueryprogress my understanding is
> method will be executed only when the query is in progress so if I refresh
> data frame here without restarting  query will it impact application ?
>
> 3.should I use unpersist (Boolean) blocking method or async method
> unpersist() as the data size is big.
>
> I feel your solution is better as it stops query --> refresh cache -->
> starts query if I compromise on little downtime even cached dataframe is
> huge .I'm not sure how listener behaves as it's asynchronous, correct me if
> I'm wrong.
>
> On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> Both works. The asynchronous method with listener will have less of down
>> time, just that the first trigger/batch after the asynchronous
>> unpersist+persist will probably take longer as it has to reload the data.
>>
>>
>> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Thanks tathagata das actually I'm planning to something like this
>>>
>>> activeQuery.stop()
>>>
>>> //unpersist and persist cached data frame
>>>
>>> df.unpersist()
>>>
>>> //read the updated data //data size of df is around 100gb
>>>
>>> df.persist()
>>>
>>>      activeQuery = startQuery()
>>>
>>>
>>> the cached data frame size around 100gb ,so the question is this the
>>> right place to refresh this huge cached data frame ?
>>>
>>> I'm also trying to refresh cached data frame in onqueryprogress() method
>>> in a class which extends StreamingQuerylistner
>>>
>>> Would like to know which is the best place to refresh cached data frame
>>> and why
>>>
>>> Thanks again for the below response
>>>
>>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> You can do something like this.
>>>>
>>>>
>>>> def startQuery(): StreamingQuery = {
>>>>    // create your streaming dataframes
>>>>    // start the query with the same checkpoint directory}
>>>>
>>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>>> while(!stopped) {
>>>>
>>>>    if (activeQuery = null) {     // if query not active, start query
>>>>      activeQuery = startQuery()
>>>>
>>>>    } else if (shouldRestartQuery())  {      // check your condition and 
>>>> restart query
>>>>      activeQuery.stop()
>>>>      activeQuery = startQuery()
>>>>    }
>>>>
>>>>    activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>>    // if there is any error it will throw exception and quit the loop
>>>>    // otherwise it will keep checking the condition every 100ms}
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks Michael
>>>>>
>>>>> I guess my question is little confusing ..let me try again
>>>>>
>>>>>
>>>>> I would like to restart streaming query programmatically while my
>>>>> streaming application is running based on a condition and why I want to do
>>>>> this
>>>>>
>>>>> I want to refresh a cached data frame based on a condition and the
>>>>> best way to do this restart streaming query suggested by Tdas below for
>>>>> similar problem
>>>>>
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>>>
>>>>> I do understand that checkpoint if helps in recovery and failures but
>>>>> I would like to know "how to restart streaming query programmatically
>>>>> without stopping my streaming application"
>>>>>
>>>>> In place of query.awaittermination should I need to have an logic to
>>>>> restart query? Please suggest
>>>>>
>>>>>
>>>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> See
>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>>>>
>>>>>> Though I think that this currently doesn't work with the console sink.
>>>>>>
>>>>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <
>>>>>> purna2prad...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>>
>>>>>>>> I'm trying to restart a streaming query to refresh cached data
>>>>>>>> frame
>>>>>>>>
>>>>>>>> Where and how should I restart streaming query
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> val sparkSes = SparkSession
>>>>>>>
>>>>>>>       .builder
>>>>>>>
>>>>>>>       .config("spark.master", "local")
>>>>>>>
>>>>>>>       .appName("StreamingCahcePoc")
>>>>>>>
>>>>>>>       .getOrCreate()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     import sparkSes.implicits._
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     val dataDF = sparkSes.readStream
>>>>>>>
>>>>>>>       .schema(streamSchema)
>>>>>>>
>>>>>>>       .csv("testData")
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>        val query = counts.writeStream
>>>>>>>
>>>>>>>       .outputMode("complete")
>>>>>>>
>>>>>>>       .format("console")
>>>>>>>
>>>>>>>       .start()
>>>>>>>
>>>>>>>
>>>>>>> query.awaittermination()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Reply via email to