[
https://issues.apache.org/jira/browse/SPARK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Theo Diefenthal resolved SPARK-26692.
-------------------------------------
Resolution: Invalid
Just read the crucial part of the doc again
{code:java}
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-querie{code}
{code:java}
As of Spark 2.3, you cannot use other non-map-like operations before joins.
Here are a few examples of what cannot be used.
- Cannot use streaming aggregations before joins.
- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode
before joins.{code}
It would still be nice if Spark would raise an exception instead of just
delivering empty results....
> Structured Streaming: Aggregation + JOIN not working
> ----------------------------------------------------
>
> Key: SPARK-26692
> URL: https://issues.apache.org/jira/browse/SPARK-26692
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Reporter: Theo Diefenthal
> Priority: Major
>
> I tried to setup a simple streaming pipeline with two streams on two data
> sources (CSV files) where one stream is fist windowed (aggregated) and then
> the streams are joined. As output, I chose console for development in Append
> Mode.
> After multiple hours of setup and testing, I still wasn't able to get a
> working example running. I also found a StackOverflow topic here
> [https://stackoverflow.com/questions/52300247/spark-scala-structured-streaming-aggregation-and-self-join]
> where they have the same findings as I had: "In general, {{append}} output
> mode with aggregations is not a recommended way. As far as I understand". And
> "I had the same empty output in my case with different aggregation. Once I
> changed it to {{update}} mode I got output. I have strong doubts that you
> will be able to do it in {{append}} mode. My understanding is that {{append}}
> mode only for {{map-like}} operations, e.g. filter, transform entry etc. I
> believe that multiphase processing".
> I observed the same and only got empty output in my example:
> {code:java}
> public static SparkSession buildSession() throws Exception {
> return SparkSession.builder()
> .appName("StreamGroupJoin")
> .config("spark.sql.shuffle.partitions", 4)
> .master("local[2]")
> .getOrCreate();
> }
> public static Dataset<Row> loadData(SparkSession session, String filepath) {
> return session
> .readStream()
> .format("csv")
> .option("header", true)
> .option("path", filepath)
> .schema(new StructType().add("ts",
> DataTypes.TimestampType).add("color", DataTypes.StringType).add("data",
> DataTypes.StringType))
> .load();
> }
> public static void main(String[] args) throws Exception {
> SparkSession session = buildSession();
> Dataset<Row> shieldStream = loadData(session,
> "streamingpoc/src/main/resources/simpleSHIELD");
> Dataset<Row> argusStream = loadData(session,
> "streamingpoc/src/main/resources/simpleARGUS");
> shieldStream = shieldStream.withWatermark("ts", "0 hours");
> argusStream = argusStream.withWatermark("ts", "0 hours");
> argusStream = argusStream.groupBy(window(col("ts"), "24 hours"),
> col("color")).count();
> argusStream = argusStream.select(col("window.start").as("argusStart"),
> col("window.end").as("argusEnd"), col("color").as("argusColor"),
> col("count").as("argusCount"));
> //argusStream = argusStream.withWatermark("argusStart", "0 hours");
> Dataset<Row> joinedStream = argusStream.join(shieldStream, expr("color =
> argusColor AND ts >= argusStart AND ts <= argusEnd"));
> joinedStream = joinedStream.withWatermark("ts", "0 hours");
> StreamingQuery joinedQuery = joinedStream.writeStream()
> .outputMode(OutputMode.Append())
> .format("console")
> .start();
> joinedQuery.awaitTermination();
> System.out.println("DONE");
> }{code}
> I'd like to address that at least in my testing version of Spark 2.4.0, it is
> not even possible to switch to OutputMode.Update due to "_Inner join between
> two streaming DataFrames/Datasets is not supported in Complete output mode,
> only in Append output mode_"
> In my example, I used two simple CSV datasets having the same format and one
> matching row which should be output after the JOIN.
> If I work without JOIN, both streams (aggregated and not) work fine. If I
> work without aggregation, JOIN works fine. But if I use both (at least in
> append mode), it doesn't work out. If I don't use Spark Structured Streaming
> but standard Spark Dataframes, I get the result I also planned to have.
> *Possible Solutions*
> # Either there is a bug/missusage in my code. In that case, the ticket can
> be closed and I'd be happy if someone could tell me what I did wrong. I tried
> quite a lot with different Watermark settings but wasn't able to find a
> working setup.
> # Perform a fix for OuputMode Append if technically possible (From my
> theoretical understanding of Big-Data-Streaming in general, this should be
> possible, but I'm not too much into the topic and I'm not familar with Spark
> afterall)
> # Make this option unavailable in spark (i.e. print out a pipeline error
> that an aggregated stream can't be joined in append mode as is already done
> if I try to join two aggregated streams). In that case, the documentation
> should also be updated and stated out that for anyone willing to perform
> aggregations and JOINs, he is advised to put the aggregation output back into
> a sink like kafka and reread from there for the join.
>
> Following is the result I'd like to obtain (And which I get if I use Spark
> Datasets instead of Spark Structured Streaming)
> {code:java}
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+
> |argusStart |argusEnd |argusColor|argusCount|ts
> |color|data |
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+
> |2018-07-20 02:00:00|2018-07-21 02:00:00|red |1 |2018-07-20
> 12:01:15|red |Iron Man|
> +-------------------+-------------------+----------+----------+-------------------+-----+--------+{code}
> And the dummy CSV files I created
> {{example-argus.csv}}
> {code:java}
> ts,color,data
> 2018-07-19T08:33:07Z,green,Green Lantern
> 2018-07-20T00:00:00Z,red,Aquaman
> 2018-07-20T07:00:00Z,green,Batman
> 2018-07-20T10:00:00Z,green,Flash
> 2018-07-21T10:01:13Z,green,Green Arrow
> 2018-07-22T10:01:15Z,green,Robin
> 2018-07-23T10:03:15Z,green,Starfire
> 2018-07-26T10:07:23Z,green,Supergirl
> 2018-07-26T10:13:23Z,red,Superman
> 2018-07-26T11:01:01Z,green,Wonder Woman
> 2018-07-26T14:02:11Z,green,Cyborg
> 2018-07-28T14:05:53Z,green,Harley Quinn
> 2018-07-30T05:00:13Z,green,Deadshot
> 2018-08-10T09:23:32Z,green,El Diablo
> 2018-08-10T12:12:12Z,green,Killer Croc{code}
> {{example-shield.csv}}
> {code:java}
> ts,color,data
> 2018-07-19T10:01:13Z,blue,Captain America
> 2018-07-20T10:01:15Z,red,Iron Man
> 2018-07-20T10:03:15Z,blue,Thor
> 2018-07-20T10:07:23Z,blue,Hulk
> 2018-07-20T10:13:23Z,blue,Black Widow
> 2018-07-20T11:01:01Z,blue,Hawkeye
> 2018-07-20T14:02:11Z,blue,Loki
> 2018-07-20T14:05:53Z,blue,Spider-Man
> 2018-07-21T05:00:13Z,blue,Vision
> 2018-07-21T09:23:32Z,blue,Scarlet Witch
> 2018-07-21T12:12:12Z,blue,Dr. Strange
> 2018-07-21T13:13:13Z,blue,Star-Lord
> 2018-07-22T01:52:18Z,red,Drax
> 2018-07-26T01:52:18Z,blue,Groot
> 2018-08-10T12:12:12Z,blue,Rocket{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]