reuvenlax commented on a change in pull request #12915:
URL: https://github.com/apache/beam/pull/12915#discussion_r516142798
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
Review comment:
I think we should explore another name for this class. IMO TemporalJoin
is best used for a join that is actually based on temporal properties - e.g.
FOR SYSTEM_TIME AS OF T in SQL. This seems like it's just a regular inner join
with a join timeout, plus use of state to allow faster emission of joined
elements.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
Review comment:
Use @link tags to link directly to functions
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
Review comment:
Consider making this an AutoValue class.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
Review comment:
Add name parameter to all applies
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
Review comment:
Don't mark the OrderedListStates as AlwaysFetched, as this will cause us
to fetch both lists in entirety on every processElement
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
Review comment:
This will be cleared if a process restarts or of the in-memory object is
ever recycled. Is this necessary? If so I would make it a ValueState so it
persists, otherwise I would remove it.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ r -> {
+ KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+ if (new Duration(r.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
Review comment:
Can you explain the use case of compareFn?
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ r -> {
+ KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+ if (new Duration(r.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ } else {
+ rightState.add(TimestampedValue.of(right, timestamp));
+ leftState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ l -> {
+ KV<V1, V2> matchCandidate = KV.of(l.getValue(), right);
+ if (new Duration(l.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ }
+ }
+
+ @OnTimer("eviction")
+ public void onEviction(
+ @StateId("left") OrderedListState<V1> leftState,
+ @StateId("right") OrderedListState<V2> rightState,
+ @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+ @Timestamp Instant ts) {
+ evictionTimerSet = false;
+ lastEvictionState.write(ts);
+ leftState.clearRange(new Instant(0L), ts);
Review comment:
timestamps can be negative - use BoundedWindow.TIMESTAMP_MIN_VALUE
instead.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ r -> {
+ KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+ if (new Duration(r.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ } else {
+ rightState.add(TimestampedValue.of(right, timestamp));
+ leftState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ l -> {
+ KV<V1, V2> matchCandidate = KV.of(l.getValue(), right);
+ if (new Duration(l.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ }
+ }
+
+ @OnTimer("eviction")
+ public void onEviction(
+ @StateId("left") OrderedListState<V1> leftState,
+ @StateId("right") OrderedListState<V2> rightState,
+ @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+ @Timestamp Instant ts) {
+ evictionTimerSet = false;
+ lastEvictionState.write(ts);
+ leftState.clearRange(new Instant(0L), ts);
+ rightState.clearRange(new Instant(0L), ts);
Review comment:
Don't you want to ts.minus(temporalBound) to be the upper bound of the
eviction?
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
Review comment:
Using KV's just to encode a pair (i.e. V1 isn't.a "key" and V2 isn't a
"value") reads slightly odd. There's nothing particularly wrong with it, but it
just reads a bit oddly.
You could also look at using
org.apache.beam.sdk.transforms.join.RawUnionValue for this, like CoGroupByKey
does.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
Review comment:
Did Beam not properly deduce a coder here on its own?
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
Review comment:
why should users prefer innerJoin?
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
Review comment:
Instead of using ProcessContext, just add the following parameters to
processElement:
@Element KV<...> e,
OutputReceiver<KV<...>> output
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
Review comment:
Thinking about this....
1. At least Dataflow will not effectively cache this read today,
especially since the read range keeps changing (though maybe we should fix this)
2. Executing the full join on every element can result in worst case
O(n^2) behavior. If temporalBound isn't too bad it might not be that bad.
Wondering if instead of doing this on every element we should instead have
timers that fire at some configurable frequency (maybe default to once a
second), and output the joined elements in the timer. you could do this easily
using TimerMap - just round the timestamp to the next minute boundary, and make
that the timer key in the map.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ r -> {
+ KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+ if (new Duration(r.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
Review comment:
Seems more obvious to me to write
if (r.getTimestamp().isAfter(timestamp.minus(temporalBound))
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ r -> {
+ KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+ if (new Duration(r.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ } else {
+ rightState.add(TimestampedValue.of(right, timestamp));
+ leftState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ l -> {
+ KV<V1, V2> matchCandidate = KV.of(l.getValue(), right);
+ if (new Duration(l.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ }
+ }
+
+ @OnTimer("eviction")
+ public void onEviction(
+ @StateId("left") OrderedListState<V1> leftState,
+ @StateId("right") OrderedListState<V2> rightState,
+ @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+ @Timestamp Instant ts) {
+ evictionTimerSet = false;
+ lastEvictionState.write(ts);
+ leftState.clearRange(new Instant(0L), ts);
+ rightState.clearRange(new Instant(0L), ts);
+ }
+ }
+
+ /**
+ * Inner joins two PCollection<KV>s that satisfy a temporal predicate.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded PCollections
in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows elements
+ * to be expired when they are irrelevant according to the event-time
watermark. This helps reduce
+ * the search space, storage, and memory requirements.
+ *
+ * @param <K> Join key type.
+ * @param <V1> Left element type in the left collection.
+ * @param <V2> Right element type in the right collection.
+ * @param name Name of the PTransform.
+ * @param leftCollection Left collection of the join.
+ * @param rightCollection Right collection of the join.
+ * @param temporalBound Time domain range used in the join predicate
(non-inclusive).
+ * @param compareFn Function used when comparing elements in the join
predicate.
+ */
+ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> temporalInnerJoin(
Review comment:
I assume outer joins are for a later PR?
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
+ rightState
+ .readRange(timestamp.minus(temporalBound),
timestamp.plus(temporalBound))
+ .forEach(
+ r -> {
+ KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+ if (new Duration(r.getTimestamp(),
timestamp).abs().isShorterThan(temporalBound)
+ && compareFn.apply(matchCandidate)) {
+ c.output(KV.of(key, matchCandidate));
+ }
+ });
+ } else {
+ rightState.add(TimestampedValue.of(right, timestamp));
Review comment:
Roughly the same code as the above block, could be refactored into a
function.
##########
File path:
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
return leftCollection.apply(name, InnerJoin.with(rightCollection));
}
+ /**
+ * PTransform representing a temporal inner join of PCollection<KV>s.
+ *
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of the values for the right collection.
+ */
+ public static class TemporalInnerJoin<K, V1, V2>
+ extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1,
V2>>>> {
+ private final transient PCollection<KV<K, V2>> rightCollection;
+ private final Duration temporalBound;
+ private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+ private TemporalInnerJoin(
+ final PCollection<KV<K, V2>> rightCollection,
+ final Duration temporalBound,
+ final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.temporalBound = temporalBound;
+ this.rightCollection = rightCollection;
+ this.comparatorFn = compareFn;
+ }
+
+ /**
+ * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+ *
+ * <p>Similar to {@code innerJoin} but also supports unbounded
PCollections in the GlobalWindow.
+ * Join results will be produced eagerly as new elements are received,
regardless of windowing,
+ * however users should prefer {@code innerJoin} in most cases for better
throughput.
+ *
+ * <p>The non-inclusive {@code temporalBound}, used as part of the join
predicate, allows
+ * elements to be expired when they are irrelevant according to the
event-time watermark. This
+ * helps reduce the search space, storage, and memory requirements.
+ *
+ * @param rightCollection Right side collection of the join.
+ * @param temporalBound Duration used in the join predicate
(non-inclusive).
+ * @param compareFn Join predicate used for matching elements.
+ * @param <K> Type of the key for both collections.
+ * @param <V1> Type of the values for the left collection.
+ * @param <V2> Type of values for the right collection.
+ */
+ public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+ PCollection<KV<K, V2>> rightCollection,
+ Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ return new TemporalInnerJoin<>(rightCollection, temporalBound,
compareFn);
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>>
leftCollection) {
+ // left right
+ // tag-left tag-right (create union type)
+ // \ /
+ // flatten
+ // join
+
+ Coder<K> keyCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getKeyCoder();
+ Coder<V1> leftValueCoder = ((KvCoder<K, V1>)
leftCollection.getCoder()).getValueCoder();
+ Coder<V2> rightValueCoder = ((KvCoder<K, V2>)
rightCollection.getCoder()).getValueCoder();
+
+ PCollection<KV<K, KV<V1, V2>>> leftUnion =
+ leftCollection
+ .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1,
V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ PCollection<KV<K, KV<V1, V2>>> rightUnion =
+ rightCollection
+ .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K,
V1, V2>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(
+ NullableCoder.of(leftValueCoder),
NullableCoder.of(rightValueCoder))));
+
+ return PCollectionList.of(leftUnion)
+ .and(rightUnion)
+ .apply(Flatten.pCollections())
+ .apply(
+ "TemporalInnerJoinFn",
+ ParDo.of(
+ new TemporalInnerJoinFn<>(
+ leftValueCoder, rightValueCoder, temporalBound,
comparatorFn)));
+ }
+ }
+
+ private static class LeftUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+ return KV.of(element.getKey(), KV.of(element.getValue(), null));
+ }
+ }
+
+ private static class RightUnionTagFn<K, V1, V2>
+ extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+ @Override
+ public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+ return KV.of(element.getKey(), KV.of(null, element.getValue()));
+ }
+ }
+
+ private static class TemporalInnerJoinFn<K, V1, V2>
+ extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+ @StateId("left")
+ private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+ @StateId("right")
+ private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+ // Null only when uninitialized. After first element is received this will
always be non-null.
+ @StateId("lastEviction")
+ private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+ @TimerId("eviction")
+ private final TimerSpec evictionSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private final Duration temporalBound;
+ private final Duration evictionFrequency;
+ private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+ // Tracks the state of the eviction timer. Value is true when the timer
has been set and
+ // execution is waiting for the event time watermark to fire the timer
according to the
+ // evictionFrequency. False after the timer has been fired, so
processElement can set the timer
+ // using the previous firing event time.
+ private transient boolean evictionTimerSet;
+
+ @Setup
+ public void setup() {
+ evictionTimerSet = false;
+ }
+
+ protected TemporalInnerJoinFn(
+ final Coder<V1> leftCoder,
+ final Coder<V2> rightCoder,
+ final Duration temporalBound,
+ SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+ this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+ this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+ this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+ this.temporalBound = temporalBound;
+ this.compareFn = compareFn;
+ this.evictionFrequency =
+ temporalBound.getMillis() <= 4 ? Duration.millis(1) :
temporalBound.dividedBy(4);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+ @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+ @AlwaysFetched @StateId("lastEviction") ValueState<Instant>
lastEvictionState,
+ @Timestamp Instant timestamp,
+ @TimerId("eviction") Timer evictionTimer) {
+ Instant lastEviction = lastEvictionState.read();
+ if (lastEviction == null) {
+ // Initialize timer for the first time relatively since event time
watermark is unknown.
+ evictionTimerSet = true;
+ evictionTimer.offset(evictionFrequency).setRelative();
+ } else if (!evictionTimerSet) {
+ // Set timer using persisted event watermark from last timer firing
event time.
+ checkNotNull(lastEviction);
+ evictionTimerSet = true;
+ evictionTimer.set(lastEviction.plus(evictionFrequency));
+ }
+
+ KV<K, KV<V1, V2>> e = c.element();
+ K key = e.getKey();
+ V1 left = e.getValue().getKey();
+ V2 right = e.getValue().getValue();
+ if (left != null) {
+ leftState.add(TimestampedValue.of(left, timestamp));
Review comment:
We will want to add a watermark hold for this as well. This can only be
done in a timer, using withOutputTimestamp
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]