[ 
https://issues.apache.org/jira/browse/SPARK-30634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yurii Oleynikov updated SPARK-30634:
------------------------------------
    Description: 
Hi ,

I have an application that makes Arbitrary Stateful Processing in Structured 
Streaming and used delta.merge to update delta table and faced strange 
behaviour:

1. I've noticed that logs inside implementation of 
{{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my 
application outputted twice.

2. While finding a root cause I've also found that number State rows reported 
by Spark is also doubles.

 

I thought that may be there's a bug in my code, so I back to 
{{JavaStructuredSessionization}} from Apache Spark examples and changed it a 
bit. Still got same result.

The problem happens only if I do not perform datch.DF.persist inside 
foreachBatch.
{code:java}
StreamingQuery query = sessionUpdates
        .writeStream()
        .outputMode("update")
        .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, 
v2) -> {
            // following doubles number of spark state rows and causes 
MapGroupsWithStateFunction to log twice withport persisting
            deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), 
mergeExpr)
                    .whenNotMatched().insertAll()
                    .whenMatched()
                    .updateAll()
                    .execute();
        })
        .trigger(Trigger.ProcessingTime(10000))
        .queryName("ACME")
        .start(); 
{code}
According to 
[https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and 
[Apache spark 
docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
 there's seems to be no need to persist dataset/dataframe inside 
{{foreachBatch.}}

Sample code from Apache Spark examples with delta: 
[JavaStructuredSessionization with Delta 
merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]

 

 

Appreciate your clarification.

 

  was:
Hi , I've faced strange behaviour with Delta merge and Arbitrary Stateful 
Processing in Structured streaming.

I have an application that makes Arbitrary Stateful Processing in Structured 
Streaming and used delta.merge to update delta table.

 

I've noticed that longs inside implementation of 
{{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my 
application outputted twice.

While finding a root cause I've also found that number State rows reported by 
Spark is also doubles.

I thought that may be there's a bug in my code, so I back to 
{{JavaStructuredSessionization}} Apache Spark examples and changed it a bit. 
Still got same result.

The problem happens only if I do not perform datch.DF.persist inside 
foreachBatch.
{code:java}
StreamingQuery query = sessionUpdates
        .writeStream()
        .outputMode("update")
        .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, 
v2) -> {
            // following doubles number of spark state rows and causes 
MapGroupsWithStateFunction to log twice withport persisting
            deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), 
mergeExpr)
                    .whenNotMatched().insertAll()
                    .whenMatched()
                    .updateAll()
                    .execute();
        })
        .trigger(Trigger.ProcessingTime(10000))
        .queryName("ACME")
        .start(); 
{code}
According to 
[https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and 
[Apache spark 
docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
 there's seems to be no need to persist dataset/dataframe inside 
{{foreachBatch.}}

Sample code from Apache Spark examples with delta: 
[JavaStructuredSessionization with Delta 
merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]

 

 

Appreciate your clarification.

 


> Delta Merge and Arbitrary Stateful Processing in Structured streaming  
> (foreachBatch)
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-30634
>                 URL: https://issues.apache.org/jira/browse/SPARK-30634
>             Project: Spark
>          Issue Type: Question
>          Components: Examples, Spark Core, Structured Streaming
>    Affects Versions: 2.4.3
>         Environment: Spark 2.4.3 (scala 2.11.12)
> Delta: 0.5.0
> Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
> OS: Ubuntu 18.04 LTS
>  
>            Reporter: Yurii Oleynikov
>            Priority: Trivial
>         Attachments: Capture1.PNG
>
>
> Hi ,
> I have an application that makes Arbitrary Stateful Processing in Structured 
> Streaming and used delta.merge to update delta table and faced strange 
> behaviour:
> 1. I've noticed that logs inside implementation of 
> {{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my 
> application outputted twice.
> 2. While finding a root cause I've also found that number State rows reported 
> by Spark is also doubles.
>  
> I thought that may be there's a bug in my code, so I back to 
> {{JavaStructuredSessionization}} from Apache Spark examples and changed it a 
> bit. Still got same result.
> The problem happens only if I do not perform datch.DF.persist inside 
> foreachBatch.
> {code:java}
> StreamingQuery query = sessionUpdates
>         .writeStream()
>         .outputMode("update")
>         .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, 
> v2) -> {
>             // following doubles number of spark state rows and causes 
> MapGroupsWithStateFunction to log twice withport persisting
>             deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), 
> mergeExpr)
>                     .whenNotMatched().insertAll()
>                     .whenMatched()
>                     .updateAll()
>                     .execute();
>         })
>         .trigger(Trigger.ProcessingTime(10000))
>         .queryName("ACME")
>         .start(); 
> {code}
> According to 
> [https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and 
> [Apache spark 
> docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
>  there's seems to be no need to persist dataset/dataframe inside 
> {{foreachBatch.}}
> Sample code from Apache Spark examples with delta: 
> [JavaStructuredSessionization with Delta 
> merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]
>  
>  
> Appreciate your clarification.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to