reuvenlax commented on a change in pull request #12915:
URL: https://github.com/apache/beam/pull/12915#discussion_r517542010



##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.

Review comment:
       cGBK also uses state under the covers and potentially needs to reiterate 
over state, so if anything this class is more efficient there. Likely what will 
makes the cGBK solution more efficient is processing all data once at EOW, 
instead of fetching state on every input element. However I would hesitate to 
make claims without actual bencvhmarks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to