[ https://issues.apache.org/jira/browse/SPARK-30634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun closed SPARK-30634. --------------------------------- > Delta Merge and Arbitrary Stateful Processing in Structured streaming > (foreachBatch) > ------------------------------------------------------------------------------------- > > Key: SPARK-30634 > URL: https://issues.apache.org/jira/browse/SPARK-30634 > Project: Spark > Issue Type: Question > Components: Examples, Spark Core, Structured Streaming > Affects Versions: 2.4.3 > Environment: Spark 2.4.3 (scala 2.11.12) > Delta: 0.5.0 > Java(TM) SE Runtime Environment (build 1.8.0_91-b14) > OS: Ubuntu 18.04 LTS > > Reporter: Yurii Oleynikov > Priority: Trivial > Attachments: Capture1.PNG > > > Hi , > I have an application that makes Arbitrary Stateful Processing in Structured > Streaming and used delta.merge to update delta table and faced strange > behaviour: > 1. I've noticed that logs inside implementation of > {{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my > application outputted twice. > 2. While finding a root cause I've also found that number State rows reported > by Spark is also doubles. > > I thought that may be there's a bug in my code, so I back to > {{JavaStructuredSessionization}} from Apache Spark examples and changed it a > bit. Still got same result. > The problem happens only if I do not perform datch.DF.persist inside > foreachBatch. > {code:java} > StreamingQuery query = sessionUpdates > .writeStream() > .outputMode("update") > .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, > v2) -> { > // following doubles number of spark state rows and causes > MapGroupsWithStateFunction to log twice withport persisting > deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), > mergeExpr) > .whenNotMatched().insertAll() > .whenMatched() > .updateAll() > .execute(); > }) > .trigger(Trigger.ProcessingTime(10000)) > .queryName("ACME") > .start(); > {code} > According to > [https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and > [Apache spark > docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch] > there's seems to be no need to persist dataset/dataframe inside > {{foreachBatch.}} > Sample code from Apache Spark examples with delta: > [JavaStructuredSessionization with Delta > merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java] > > > Appreciate your clarification. > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org