This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 27c34bd978f [SPARK-42591][SS][DOCS] Add examples of unblocked 
workloads after SPARK-42376
27c34bd978f is described below

commit 27c34bd978f976b509cdefff7f179b2779012eae
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Mar 8 05:23:25 2023 +0900

    [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after 
SPARK-42376
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add examples of unblocked workloads after SPARK-42376, 
which unblocks stream-stream time interval join followed by stateful operator.
    
    ### Why are the changes needed?
    
    We'd like to remove the description of limitations which no longer exist, 
as well as provide some code examples so that users can get some sense how to 
use the functionality.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, documentation change.
    
    ### How was this patch tested?
    
    Created a page via SKIP_API=1 bundle exec jekyll serve --watch and 
confirmed.
    
    Screenshots:
    
    > Scala
    
    ![스크린샷 2023-03-07 오후 9 39 
36](https://user-images.githubusercontent.com/1317309/223424683-e7f7e721-a0fa-4e3c-a8f0-139d060dd045.png)
    
    > Java
    
    ![스크린샷 2023-03-07 오후 9 39 
28](https://user-images.githubusercontent.com/1317309/223424706-b4da49c1-f088-4513-85d6-8750b89dac56.png)
    
    > Python
    
    ![스크린샷 2023-03-07 오후 9 37 
03](https://user-images.githubusercontent.com/1317309/223424412-c12500cc-946f-4e09-8b0c-6ceed5b3aeee.png)
    
    Closes #40215 from HeartSaVioR/SPARK-42591.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md | 130 ++++++++++++++++++++++++-
 1 file changed, 126 insertions(+), 4 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index cf7f0ab6e15..a71c774f328 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1928,12 +1928,134 @@ Additional details on supported joins:
 
 - As of Spark 2.4, you can use joins only when the query is in Append output 
mode. Other output modes are not yet supported.
 
-- As of Spark 2.4, you cannot use other non-map-like operations before joins. 
Here are a few examples of
-  what cannot be used.
+- You cannot use mapGroupsWithState and flatMapGroupsWithState before and 
after joins.
 
-  - Cannot use streaming aggregations before joins.
+In append output mode, you can construct a query having non-map-like 
operations e.g. aggregation, deduplication, stream-stream join before/after 
join.
 
-  - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode 
before joins.
+For example, here's an example of time window aggregation in both streams 
followed by stream-stream join with event time window:
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+
+val clicksWindow = clicksWithWatermark
+  .groupBy(window("clickTime", "1 hour"))
+  .count()
+
+val impressionsWindow = impressionsWithWatermark
+  .groupBy(window("impressionTime", "1 hour"))
+  .count()
+
+clicksWindow.join(impressionsWindow, "window", "inner")
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+
+Dataset<Row> clicksWindow = clicksWithWatermark
+  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
+  .count();
+
+Dataset<Row> impressionsWindow = impressionsWithWatermark
+  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 
hour"))
+  .count();
+
+clicksWindow.join(impressionsWindow, "window", "inner");
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+clicksWindow = clicksWithWatermark.groupBy(
+  clicksWithWatermark.clickAdId,
+  window(clicksWithWatermark.clickTime, "1 hour")
+).count()
+
+impressionsWindow = impressionsWithWatermark.groupBy(
+  impressionsWithWatermark.impressionAdId,
+  window(impressionsWithWatermark.impressionTime, "1 hour")
+).count()
+
+clicksWindow.join(impressionsWindow, "window", "inner")
+
+{% endhighlight %}
+
+</div>
+</div>
+
+Here's another example of stream-stream join with time range join condition 
followed by time window aggregation:
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+
+val joined = impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+  """),
+  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", 
"fullOuter", "leftSemi"
+)
+
+joined
+  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
+  .count()
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+Dataset<Row> joined = impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr(
+    "clickAdId = impressionAdId AND " +
+    "clickTime >= impressionTime AND " +
+    "clickTime <= impressionTime + interval 1 hour "),
+  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", 
"fullOuter", "leftSemi"
+);
+
+joined
+  .groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), 
"1 hour"))
+  .count();
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+joined = impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+    """),
+  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", 
"fullOuter", "leftSemi"
+)
+
+joined.groupBy(
+  joined.clickAdId,
+  window(joined.clickTime, "1 hour")
+).count()
+
+{% endhighlight %}
+
+</div>
+</div>
 
 ### Streaming Deduplication
 You can deduplicate records in data streams using a unique identifier in the 
events. This is exactly same as deduplication on static using a unique 
identifier column. The query will store the necessary amount of data from 
previous records such that it can filter duplicate records. Similar to 
aggregations, you can use deduplication with or without watermarking.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to