[ 
https://issues.apache.org/jira/browse/SPARK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Theo Diefenthal resolved SPARK-26692.
-------------------------------------
    Resolution: Invalid

Just read the crucial part of the doc again
{code:java}
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-querie{code}
{code:java}
As of Spark 2.3, you cannot use other non-map-like operations before joins. 
Here are a few examples of what cannot be used.

- Cannot use streaming aggregations before joins.

- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode 
before joins.{code}
It would still be nice if Spark would raise an exception instead of just 
delivering empty results....

> Structured Streaming: Aggregation + JOIN not working
> ----------------------------------------------------
>
>                 Key: SPARK-26692
>                 URL: https://issues.apache.org/jira/browse/SPARK-26692
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> I tried to setup a simple streaming pipeline with two streams on two data 
> sources (CSV files) where one stream is fist windowed (aggregated) and then 
> the streams are joined. As output, I chose console for development in Append 
> Mode.
> After multiple hours of setup and testing, I still wasn't able to get a 
> working example running. I also found a StackOverflow topic here 
> [https://stackoverflow.com/questions/52300247/spark-scala-structured-streaming-aggregation-and-self-join]
>  where they have the same findings as I had: "In general, {{append}} output 
> mode with aggregations is not a recommended way. As far as I understand". And 
> "I had the same empty output in my case with different aggregation. Once I 
> changed it to {{update}} mode I got output. I have strong doubts that you 
> will be able to do it in {{append}} mode. My understanding is that {{append}} 
> mode only for {{map-like}} operations, e.g. filter, transform entry etc. I 
> believe that multiphase processing". 
> I observed the same and only got empty output in my example:
> {code:java}
> public static SparkSession buildSession() throws Exception {
>     return SparkSession.builder()
>             .appName("StreamGroupJoin")
>             .config("spark.sql.shuffle.partitions", 4)
>             .master("local[2]")
>             .getOrCreate();
> }
> public static Dataset<Row> loadData(SparkSession session, String filepath) {
>     return session
>             .readStream()
>             .format("csv")
>             .option("header", true)
>             .option("path", filepath)
>             .schema(new StructType().add("ts", 
> DataTypes.TimestampType).add("color", DataTypes.StringType).add("data", 
> DataTypes.StringType))
>             .load();
> }
> public static void main(String[] args) throws Exception {
>     SparkSession session = buildSession();
>     Dataset<Row> shieldStream = loadData(session, 
> "streamingpoc/src/main/resources/simpleSHIELD");
>     Dataset<Row> argusStream = loadData(session, 
> "streamingpoc/src/main/resources/simpleARGUS");
>     shieldStream = shieldStream.withWatermark("ts", "0 hours");
>     argusStream = argusStream.withWatermark("ts", "0 hours");
>     argusStream = argusStream.groupBy(window(col("ts"), "24 hours"), 
> col("color")).count();
>     argusStream = argusStream.select(col("window.start").as("argusStart"), 
> col("window.end").as("argusEnd"), col("color").as("argusColor"), 
> col("count").as("argusCount"));
>     //argusStream = argusStream.withWatermark("argusStart", "0 hours");
>     Dataset<Row> joinedStream = argusStream.join(shieldStream, expr("color = 
> argusColor AND ts >= argusStart AND ts <= argusEnd"));
>     joinedStream = joinedStream.withWatermark("ts", "0 hours");
>     StreamingQuery joinedQuery = joinedStream.writeStream()
>             .outputMode(OutputMode.Append())
>             .format("console")
>             .start();
>     joinedQuery.awaitTermination();
>     System.out.println("DONE");
> }{code}
> I'd like to address that at least in my testing version of Spark 2.4.0, it is 
> not even possible to switch to OutputMode.Update due to "_Inner join between 
> two streaming DataFrames/Datasets is not supported in Complete output mode, 
> only in Append output mode_"
> In my example, I used two simple CSV datasets having the same format and one 
> matching row which should be output after the JOIN. 
> If I work without JOIN, both streams (aggregated and not) work fine. If I 
> work without aggregation, JOIN works fine. But if I use both (at least in 
> append mode), it doesn't work out. If I don't use Spark Structured Streaming 
> but standard Spark Dataframes, I get the result I also planned to have. 
> *Possible Solutions*
>  #  Either there is a bug/missusage in my code. In that case, the ticket can 
> be closed and I'd be happy if someone could tell me what I did wrong. I tried 
> quite a lot with different Watermark settings but wasn't able to find a 
> working setup.
>  # Perform a fix for OuputMode Append if technically possible (From my 
> theoretical understanding of Big-Data-Streaming in general, this should be 
> possible, but I'm not too much into the topic and I'm not familar with Spark 
> afterall)
>  # Make this option unavailable in spark (i.e. print out a pipeline error 
> that an aggregated stream can't be joined in append mode as is already done 
> if I try to join two aggregated streams). In that case, the documentation 
> should also be updated and stated out that for anyone willing to perform 
> aggregations and JOINs, he is advised to put the aggregation output back into 
> a sink like kafka and reread from there for the join.
>  
> Following is the result I'd like to obtain (And which I get if I use Spark 
> Datasets instead of Spark Structured Streaming)
> {code:java}
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+
> |argusStart         |argusEnd           |argusColor|argusCount|ts             
>     |color|data    |
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+
> |2018-07-20 02:00:00|2018-07-21 02:00:00|red       |1         |2018-07-20 
> 12:01:15|red  |Iron Man|
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+{code}
> And the dummy CSV files I created
> {{example-argus.csv}}
> {code:java}
> ts,color,data
> 2018-07-19T08:33:07Z,green,Green Lantern
> 2018-07-20T00:00:00Z,red,Aquaman
> 2018-07-20T07:00:00Z,green,Batman
> 2018-07-20T10:00:00Z,green,Flash
> 2018-07-21T10:01:13Z,green,Green Arrow
> 2018-07-22T10:01:15Z,green,Robin
> 2018-07-23T10:03:15Z,green,Starfire
> 2018-07-26T10:07:23Z,green,Supergirl
> 2018-07-26T10:13:23Z,red,Superman
> 2018-07-26T11:01:01Z,green,Wonder Woman
> 2018-07-26T14:02:11Z,green,Cyborg
> 2018-07-28T14:05:53Z,green,Harley Quinn
> 2018-07-30T05:00:13Z,green,Deadshot
> 2018-08-10T09:23:32Z,green,El Diablo
> 2018-08-10T12:12:12Z,green,Killer Croc{code}
> {{example-shield.csv}}
> {code:java}
> ts,color,data
> 2018-07-19T10:01:13Z,blue,Captain America
> 2018-07-20T10:01:15Z,red,Iron Man
> 2018-07-20T10:03:15Z,blue,Thor
> 2018-07-20T10:07:23Z,blue,Hulk
> 2018-07-20T10:13:23Z,blue,Black Widow
> 2018-07-20T11:01:01Z,blue,Hawkeye
> 2018-07-20T14:02:11Z,blue,Loki
> 2018-07-20T14:05:53Z,blue,Spider-Man
> 2018-07-21T05:00:13Z,blue,Vision
> 2018-07-21T09:23:32Z,blue,Scarlet Witch
> 2018-07-21T12:12:12Z,blue,Dr. Strange
> 2018-07-21T13:13:13Z,blue,Star-Lord
> 2018-07-22T01:52:18Z,red,Drax
> 2018-07-26T01:52:18Z,blue,Groot
> 2018-08-10T12:12:12Z,blue,Rocket{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to