Re: [Structured Streaming] Restarting streaming query on exception/termination

2018-04-23 Thread Priyank Shrivastava
Thanks for the reply formice.  I think that --supervise param helps to
restart the whole spark application - what I want to be able to do is to
only restart the structured streaming query which terminated due to error.
Also, I am running my app in client mode.

Thanks,
Priyank

On Sun, Apr 22, 2018 at 8:52 PM, formice <51296...@qq.com> wrote:

> standlone
>   add  config:(1)--deploy-mode cluster (2)--supervise
>   example:  spark-submit  --master spark://master:7077 --deploy-mode
> cluster --supervise ..
>
>
> -- 原始邮件 ------
> *发件人:* "Priyank Shrivastava"<priya...@gmail.com>;
> *发送时间:* 2018年4月21日(星期六) 凌晨5:45
> *收件人:* "user"<user@spark.apache.org>;
> *主题:* [Structured Streaming] Restarting streaming query on
> exception/termination
>
> What's the right way of programmatically restarting a structured streaming
> query which has terminated due to an exception? Example code or reference
> would be appreciated.
>
> Could it be done from within the onQueryTerminated() event handler of
> StreamingQueryListener class?
>
> Priyank
>
>


[Structured Streaming] Restarting streaming query on exception/termination

2018-04-20 Thread Priyank Shrivastava
What's the right way of programmatically restarting a structured streaming
query which has terminated due to an exception? Example code or reference
would be appreciated.

Could it be done from within the onQueryTerminated() event handler of
StreamingQueryListener class?

Priyank


[Structured Streaming] Application Updates in Production

2018-03-21 Thread Priyank Shrivastava
I am using Structured Streaming with Spark 2.2.  We are using Kafka as our
source and are using checkpoints for failure recovery and e2e exactly once
guarantees.  I would like to get some more information on how to handle
updates to the application when there is a change in stateful operations
and/or output schema.

As some of the sources suggest I can start the updated application
parallelly with the old application until it catches up with the old
application in terms of data, and then kill the old one.  But then the new
application will have to re-read/re-process all the data in kafka which
could take a long time.

I want to AVOID this re-processing of the data in the newly deployed
updated application.

One way I can think of is for the application to keep writing the offsets
into something in addition to the checkpoint directory, for example in
zookeeper/hdfs.  And then, on an update of the application, I command Kafka
readstream() to start reading from the offsets stored in this new location
(zookeeper/hdfs) - since the updated application can't read from the
checkpoint directory which is now deemed incompatible.

So a couple of questions:
1.  Is the above-stated solution a valid solution?
2.  If yes, How can I automate the detection of whether the application is
being restarted because of a failure/maintenance or because of code changes
to stateful operations and/or output schema?

Any guidance, example or information source is appreciated.

Thanks,
Priyank


[Structured Streaming] Avoiding multiple streaming queries

2018-02-12 Thread Priyank Shrivastava
I have a structured streaming query which sinks to Kafka.  This query has a
complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics
each partitioned on a different ‘key’ column.  I don’t want to have
multiple Kafka sinks for each of the different Kafka topics because that
would mean running multiple streaming queries - one for each Kafka topic,
especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to
multiple Kafka topics each with a different key column but without having
to execute multiple streaming queries?


2.  If not,  would it be efficient to cascade the multiple queries such
that the first query does the complex aggregation and writes output
to Kafka and then the other queries just read the output of the first query
and write their topics to Kafka thus avoiding doing the complex aggregation
again?


Thanks in advance for any help.


Priyank


[Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Priyank Shrivastava
I have streams of data coming in from various applications through Kafka.
These streams are converted into dataframes in Spark.  I would like to join
these dataframes on a common ID they all contain.

Since  joining streaming dataframes is currently not supported, what is the
current recommended way to join two dataFrames  in a streaming context.

Is it recommended to keep writing the streaming dataframes into some sink
to convert them into static dataframes which can then be joined?  Would
this guarantee end-to-end exactly once and fault tolerant guarantees?

Priyank


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Priyank Shrivastava
Also, in your example doesn't the tempview need to be accessed using the
same sparkSession on the scala side?  Since I am not using a notebook, how
can I get access to the same sparksession in scala.

On Fri, Jul 28, 2017 at 3:17 PM, Priyank Shrivastava <priy...@asperasoft.com
> wrote:

> Thanks Burak.
>
> In a streaming context would I need to do any state management for the
> temp views? for example across sliding windows.
>
> Priyank
>
> On Fri, Jul 28, 2017 at 3:13 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi Priyank,
>>
>> You may register them as temporary tables to use across language
>> boundaries.
>>
>> Python:
>> df = spark.readStream...
>> # Python logic
>> df.createOrReplaceTempView("tmp1")
>>
>> Scala:
>> val df = spark.table("tmp1")
>> df.writeStream
>>   .foreach(...)
>>
>>
>> On Fri, Jul 28, 2017 at 3:06 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> TD,
>>>
>>> For a hybrid python-scala approach, what's the recommended way of
>>> handing off a dataframe from python to scala.  I would like to know
>>> especially in a streaming context.
>>>
>>> I am not using notebooks/databricks.  We are running it on our own spark
>>> 2.1 cluster.
>>>
>>> Priyank
>>>
>>> On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> We see that all the time. For example, in SQL, people can write their
>>>> user-defined function in Scala/Java and use it from SQL/python/anywhere.
>>>> That is the recommended way to get the best combo of performance and
>>>> ease-of-use from non-jvm languages.
>>>>
>>>> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
>>>> priy...@asperasoft.com> wrote:
>>>>
>>>>> Thanks TD.  I am going to try the python-scala hybrid approach by
>>>>> using scala only for custom redis sink and python for the rest of the app
>>>>> .  I understand it might not be as efficient as purely writing the app in
>>>>> scala but unfortunately I am constrained on scala resources.  Have you 
>>>>> come
>>>>> across other use cases where people have resided to such python-scala
>>>>> hybrid approach?
>>>>>
>>>>> Regards,
>>>>> Priyank
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>
>>>>>> Hello Priyank
>>>>>>
>>>>>> Writing something purely in Scale/Java would be the most efficient.
>>>>>> Even if we expose python APIs that allow writing custom sinks in pure
>>>>>> Python, it wont be as efficient as Scala/Java foreach as the data would
>>>>>> have to go through JVM / PVM boundary which has significant overheads. So
>>>>>> Scala/Java foreach is always going to be the best option.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>>>>>> priy...@asperasoft.com> wrote:
>>>>>>
>>>>>>> I am trying to write key-values to redis using a DataStreamWriter
>>>>>>> object using pyspark structured streaming APIs. I am using Spark 2.2
>>>>>>>
>>>>>>> Since the Foreach Sink is not supported for python; here
>>>>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach>,
>>>>>>> I am trying to find out some alternatives.
>>>>>>>
>>>>>>> One alternative is to write a separate Scala module only to push
>>>>>>> data into redis using foreach; ForeachWriter
>>>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter>
>>>>>>>  is
>>>>>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>>>>>> adds deployment overhead because now I will have to support Scala in my 
>>>>>>> app.
>>>>>>>
>>>>>>> Another approach is obviously to use Scala instead of python, which
>>>>>>> is fine but I want to make sure that I absolutely cannot use python for
>>>>>>> this problem before I take this path.
>>>>>>>
>>>>>>> Would appreciate some feedback and alternative design approaches for
>>>>>>> this problem.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Priyank Shrivastava
Thanks Burak.

In a streaming context would I need to do any state management for the temp
views? for example across sliding windows.

Priyank

On Fri, Jul 28, 2017 at 3:13 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi Priyank,
>
> You may register them as temporary tables to use across language
> boundaries.
>
> Python:
> df = spark.readStream...
> # Python logic
> df.createOrReplaceTempView("tmp1")
>
> Scala:
> val df = spark.table("tmp1")
> df.writeStream
>   .foreach(...)
>
>
> On Fri, Jul 28, 2017 at 3:06 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> TD,
>>
>> For a hybrid python-scala approach, what's the recommended way of handing
>> off a dataframe from python to scala.  I would like to know especially in a
>> streaming context.
>>
>> I am not using notebooks/databricks.  We are running it on our own spark
>> 2.1 cluster.
>>
>> Priyank
>>
>> On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> We see that all the time. For example, in SQL, people can write their
>>> user-defined function in Scala/Java and use it from SQL/python/anywhere.
>>> That is the recommended way to get the best combo of performance and
>>> ease-of-use from non-jvm languages.
>>>
>>> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
>>>> Thanks TD.  I am going to try the python-scala hybrid approach by using
>>>> scala only for custom redis sink and python for the rest of the app .  I
>>>> understand it might not be as efficient as purely writing the app in scala
>>>> but unfortunately I am constrained on scala resources.  Have you come
>>>> across other use cases where people have resided to such python-scala
>>>> hybrid approach?
>>>>
>>>> Regards,
>>>> Priyank
>>>>
>>>>
>>>>
>>>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Hello Priyank
>>>>>
>>>>> Writing something purely in Scale/Java would be the most efficient.
>>>>> Even if we expose python APIs that allow writing custom sinks in pure
>>>>> Python, it wont be as efficient as Scala/Java foreach as the data would
>>>>> have to go through JVM / PVM boundary which has significant overheads. So
>>>>> Scala/Java foreach is always going to be the best option.
>>>>>
>>>>> TD
>>>>>
>>>>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>>>>> priy...@asperasoft.com> wrote:
>>>>>
>>>>>> I am trying to write key-values to redis using a DataStreamWriter
>>>>>> object using pyspark structured streaming APIs. I am using Spark 2.2
>>>>>>
>>>>>> Since the Foreach Sink is not supported for python; here
>>>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach>,
>>>>>> I am trying to find out some alternatives.
>>>>>>
>>>>>> One alternative is to write a separate Scala module only to push data
>>>>>> into redis using foreach; ForeachWriter
>>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter>
>>>>>>  is
>>>>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>>>>> adds deployment overhead because now I will have to support Scala in my 
>>>>>> app.
>>>>>>
>>>>>> Another approach is obviously to use Scala instead of python, which
>>>>>> is fine but I want to make sure that I absolutely cannot use python for
>>>>>> this problem before I take this path.
>>>>>>
>>>>>> Would appreciate some feedback and alternative design approaches for
>>>>>> this problem.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-28 Thread Priyank Shrivastava
TD,

For a hybrid python-scala approach, what's the recommended way of handing
off a dataframe from python to scala.  I would like to know especially in a
streaming context.

I am not using notebooks/databricks.  We are running it on our own spark
2.1 cluster.

Priyank

On Wed, Jul 26, 2017 at 12:49 PM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> We see that all the time. For example, in SQL, people can write their
> user-defined function in Scala/Java and use it from SQL/python/anywhere.
> That is the recommended way to get the best combo of performance and
> ease-of-use from non-jvm languages.
>
> On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> Thanks TD.  I am going to try the python-scala hybrid approach by using
>> scala only for custom redis sink and python for the rest of the app .  I
>> understand it might not be as efficient as purely writing the app in scala
>> but unfortunately I am constrained on scala resources.  Have you come
>> across other use cases where people have resided to such python-scala
>> hybrid approach?
>>
>> Regards,
>> Priyank
>>
>>
>>
>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hello Priyank
>>>
>>> Writing something purely in Scale/Java would be the most efficient. Even
>>> if we expose python APIs that allow writing custom sinks in pure Python, it
>>> wont be as efficient as Scala/Java foreach as the data would have to go
>>> through JVM / PVM boundary which has significant overheads. So Scala/Java
>>> foreach is always going to be the best option.
>>>
>>> TD
>>>
>>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
>>>> I am trying to write key-values to redis using a DataStreamWriter
>>>> object using pyspark structured streaming APIs. I am using Spark 2.2
>>>>
>>>> Since the Foreach Sink is not supported for python; here
>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach>,
>>>> I am trying to find out some alternatives.
>>>>
>>>> One alternative is to write a separate Scala module only to push data
>>>> into redis using foreach; ForeachWriter
>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter>
>>>>  is
>>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>>> adds deployment overhead because now I will have to support Scala in my 
>>>> app.
>>>>
>>>> Another approach is obviously to use Scala instead of python, which is
>>>> fine but I want to make sure that I absolutely cannot use python for this
>>>> problem before I take this path.
>>>>
>>>> Would appreciate some feedback and alternative design approaches for
>>>> this problem.
>>>>
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Priyank Shrivastava
Thanks TD.  I am going to try the python-scala hybrid approach by using
scala only for custom redis sink and python for the rest of the app .  I
understand it might not be as efficient as purely writing the app in scala
but unfortunately I am constrained on scala resources.  Have you come
across other use cases where people have resided to such python-scala
hybrid approach?

Regards,
Priyank



On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Hello Priyank
>
> Writing something purely in Scale/Java would be the most efficient. Even
> if we expose python APIs that allow writing custom sinks in pure Python, it
> wont be as efficient as Scala/Java foreach as the data would have to go
> through JVM / PVM boundary which has significant overheads. So Scala/Java
> foreach is always going to be the best option.
>
> TD
>
> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> I am trying to write key-values to redis using a DataStreamWriter object
>> using pyspark structured streaming APIs. I am using Spark 2.2
>>
>> Since the Foreach Sink is not supported for python; here
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach>,
>> I am trying to find out some alternatives.
>>
>> One alternative is to write a separate Scala module only to push data
>> into redis using foreach; ForeachWriter
>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter>
>>  is
>> supported in Scala. BUT this doesn't seem like an efficient approach and
>> adds deployment overhead because now I will have to support Scala in my app.
>>
>> Another approach is obviously to use Scala instead of python, which is
>> fine but I want to make sure that I absolutely cannot use python for this
>> problem before I take this path.
>>
>> Would appreciate some feedback and alternative design approaches for this
>> problem.
>>
>> Thanks.
>>
>>
>>
>>
>


[SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-25 Thread Priyank Shrivastava
I am trying to write key-values to redis using a DataStreamWriter object
using pyspark structured streaming APIs. I am using Spark 2.2

Since the Foreach Sink is not supported for python; here
,
I am trying to find out some alternatives.

One alternative is to write a separate Scala module only to push data into
redis using foreach; ForeachWriter

is
supported in Scala. BUT this doesn't seem like an efficient approach and
adds deployment overhead because now I will have to support Scala in my app.

Another approach is obviously to use Scala instead of python, which is fine
but I want to make sure that I absolutely cannot use python for this
problem before I take this path.

Would appreciate some feedback and alternative design approaches for this
problem.

Thanks.