[
https://issues.apache.org/jira/browse/SPARK-30634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-30634.
---------------------------------
> Delta Merge and Arbitrary Stateful Processing in Structured streaming
> (foreachBatch)
> -------------------------------------------------------------------------------------
>
> Key: SPARK-30634
> URL: https://issues.apache.org/jira/browse/SPARK-30634
> Project: Spark
> Issue Type: Question
> Components: Examples, Spark Core, Structured Streaming
> Affects Versions: 2.4.3
> Environment: Spark 2.4.3 (scala 2.11.12)
> Delta: 0.5.0
> Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
> OS: Ubuntu 18.04 LTS
>
> Reporter: Yurii Oleynikov
> Priority: Trivial
> Attachments: Capture1.PNG
>
>
> Hi ,
> I have an application that makes Arbitrary Stateful Processing in Structured
> Streaming and used delta.merge to update delta table and faced strange
> behaviour:
> 1. I've noticed that logs inside implementation of
> {{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my
> application outputted twice.
> 2. While finding a root cause I've also found that number State rows reported
> by Spark is also doubles.
>
> I thought that may be there's a bug in my code, so I back to
> {{JavaStructuredSessionization}} from Apache Spark examples and changed it a
> bit. Still got same result.
> The problem happens only if I do not perform datch.DF.persist inside
> foreachBatch.
> {code:java}
> StreamingQuery query = sessionUpdates
> .writeStream()
> .outputMode("update")
> .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf,
> v2) -> {
> // following doubles number of spark state rows and causes
> MapGroupsWithStateFunction to log twice withport persisting
> deltaTable.as("sessions").merge(batchDf.toDF().as("updates"),
> mergeExpr)
> .whenNotMatched().insertAll()
> .whenMatched()
> .updateAll()
> .execute();
> })
> .trigger(Trigger.ProcessingTime(10000))
> .queryName("ACME")
> .start();
> {code}
> According to
> [https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and
> [Apache spark
> docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
> there's seems to be no need to persist dataset/dataframe inside
> {{foreachBatch.}}
> Sample code from Apache Spark examples with delta:
> [JavaStructuredSessionization with Delta
> merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]
>
>
> Appreciate your clarification.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]