Yurii Oleynikov created SPARK-30634:
---------------------------------------
Summary: Delta Merge and Arbitrary Stateful Processing in
Structured streaming (foreachBatch)
Key: SPARK-30634
URL: https://issues.apache.org/jira/browse/SPARK-30634
Project: Spark
Issue Type: Question
Components: Examples, Spark Core, Structured Streaming
Affects Versions: 2.4.3
Environment: Spark 2.4.3 (scala 2.11.12)
Delta: 0.5.0
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
OS: Ubuntu 18.04 LTS
Reporter: Yurii Oleynikov
Attachments: Capture1.PNG
Hi , I've faced strange behaviour with Delta merge and Arbitrary Stateful
Processing in Structured streaming.
I have an application that makes Arbitrary Stateful Processing in Structured
Streaming and used delta.merge to update delta table.
I've noticed that longs inside implementation of
{{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my
application outputted twice.
While finding a root cause I've also found that number State rows reported by
Spark is also doubles.
I thought that may be there's a bug in my code, so I back to
{{JavaStructuredSessionization}} Apache Spark examples and changed it a bit.
Still got same result.
The problem happens only if I do not perform datch.DF.persist inside
foreachBatch.
{code:java}
StreamingQuery query = sessionUpdates
.writeStream()
.outputMode("update")
.foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf,
v2) -> {
// following doubles number of spark state rows and causes
MapGroupsWithStateFunction to log twice withport persisting
deltaTable.as("sessions").merge(batchDf.toDF().as("updates"),
mergeExpr)
.whenNotMatched().insertAll()
.whenMatched()
.updateAll()
.execute();
})
.trigger(Trigger.ProcessingTime(10000))
.queryName("ACME")
.start();
{code}
According to
[https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and
[Apache spark
docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
there's seems to be no need to persist dataset/dataframe inside
{{foreachBatch.}}
Sample code from Apache Spark examples with delta:
[JavaStructuredSessionization with Delta
merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]
Appreciate your clarification.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]