tysonjh commented on a change in pull request #12915:
URL: https://github.com/apache/beam/pull/12915#discussion_r524898446



##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>

Review comment:
       Done. Went with `EventTimeBoundedEquijoin`.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.

Review comment:
       Removed.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;

Review comment:
       I think I need it, so I made it into a state, but let me know if you 
know how to get rid of it please. I want to be able to differentiate between 
these states: never initialized, initialized but needs updating, doesn't need 
updating and track the last time the timer triggered.
   
   What I'm doing here is:
   
     `lastEviction == null` when the timer has never been initialized, use the 
first record received to set the timer
     `lastEviction != null` when the timer has been initialized, rely on 
`evictionTimerSet`
   
     `evictionTimerSet == true` don't update it, allow the trigger to reset the 
variable to `false`
     `evictionTimerSet == false` set the timer on next record to 
`lastEvictionTime + evictionFrequency`.
   
   I wouldn't need to do this if I could set timers in the `@OnTimer` method or 
if I could set some kind of looping trigger.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,

Review comment:
       Done.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                r -> {
+                  KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+                  if (new Duration(r.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)

Review comment:
       This wouldn't work. For example,
   
   temporalBound = 3
   r.getTimestamp() = 5
   timestamp = 4
   
   5.isAfter(4-3) == false
   
   but this example should produce a join result since difference is within the 
temporalBound (i.e. 1.isShorterThan(3)).
   

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                r -> {
+                  KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+                  if (new Duration(r.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {

Review comment:
       Removed from other discussions.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>

Review comment:
       AutoValue doesn't support transient fields so the pcollection causes a 
problem.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.

Review comment:
       Done.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =

Review comment:
       I was following along with the other join methods in this class that use 
KVs. Using a `RawUnionValue` means I would need to create a results class (like 
cGBK) or un-tag the results afterwards just to produce a `KV<K, KV<V1, V2>>` 
like the other joins.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));

Review comment:
       Correct, it was unable to infer one.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,

Review comment:
       Done.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())

Review comment:
       Done.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                r -> {
+                  KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+                  if (new Duration(r.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      } else {
+        rightState.add(TimestampedValue.of(right, timestamp));
+        leftState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                l -> {
+                  KV<V1, V2> matchCandidate = KV.of(l.getValue(), right);
+                  if (new Duration(l.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      }
+    }
+
+    @OnTimer("eviction")
+    public void onEviction(
+        @StateId("left") OrderedListState<V1> leftState,
+        @StateId("right") OrderedListState<V2> rightState,
+        @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+        @Timestamp Instant ts) {
+      evictionTimerSet = false;
+      lastEvictionState.write(ts);
+      leftState.clearRange(new Instant(0L), ts);
+      rightState.clearRange(new Instant(0L), ts);

Review comment:
       Yes, you're right. I want to test the eviction but I don't know how. 
When I try to use two TestStreams with various watermark updates it doesn't 
seem to trigger. Is there a way I can test this behavior?

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));

Review comment:
       Could you elaborate a bit please? I don't understand why we want the 
hold, or what it accomplishes. The docs are a bit tricky to follow regarding 
this.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))

Review comment:
       Will the timer family reduce the worst case? The O(n^2) comes from 
searching through the state on each input, won't that still be required for 
finding the 'joined elements' to output in the timer?

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                r -> {
+                  KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+                  if (new Duration(r.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      } else {
+        rightState.add(TimestampedValue.of(right, timestamp));

Review comment:
       The refactored function ends up looking pretty messy because of the 
types and the ordering required for creating them. Creating a `KV` causes 
problems, it requires passing a lambda to construct the KV with positional 
arguments (e.g. left vs. right).
   
   It ends up looking pretty nasty, not worth the savings in lines IMO:
   
   ```java
   void findAndOutputMatches(Instant timestamp, T searchValue, 
OrderedListState<U> searchState,
            BiFunction<U, T, KV<K, KV<V1, V2>>> kvCreator, OutputReceiver<KV<K, 
KV<V1, V2>>> outputReceiver)
   ```
   
   I pulled out the gnarly if condition to a function though, that cleans it up 
a bit.

##########
File path: 
sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
##########
@@ -350,6 +368,255 @@ public void processElement(ProcessContext c) {
     return leftCollection.apply(name, InnerJoin.with(rightCollection));
   }
 
+  /**
+   * PTransform representing a temporal inner join of PCollection<KV>s.
+   *
+   * @param <K> Type of the key for both collections.
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   */
+  public static class TemporalInnerJoin<K, V1, V2>
+      extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, KV<V1, 
V2>>>> {
+    private final transient PCollection<KV<K, V2>> rightCollection;
+    private final Duration temporalBound;
+    private final SimpleFunction<KV<V1, V2>, Boolean> comparatorFn;
+
+    private TemporalInnerJoin(
+        final PCollection<KV<K, V2>> rightCollection,
+        final Duration temporalBound,
+        final SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.temporalBound = temporalBound;
+      this.rightCollection = rightCollection;
+      this.comparatorFn = compareFn;
+    }
+
+    /**
+     * Returns a TemporalInnerJoin PTransform that joins two PCollection<KV>s.
+     *
+     * <p>Similar to {@code innerJoin} but also supports unbounded 
PCollections in the GlobalWindow.
+     * Join results will be produced eagerly as new elements are received, 
regardless of windowing,
+     * however users should prefer {@code innerJoin} in most cases for better 
throughput.
+     *
+     * <p>The non-inclusive {@code temporalBound}, used as part of the join 
predicate, allows
+     * elements to be expired when they are irrelevant according to the 
event-time watermark. This
+     * helps reduce the search space, storage, and memory requirements.
+     *
+     * @param rightCollection Right side collection of the join.
+     * @param temporalBound Duration used in the join predicate 
(non-inclusive).
+     * @param compareFn Join predicate used for matching elements.
+     * @param <K> Type of the key for both collections.
+     * @param <V1> Type of the values for the left collection.
+     * @param <V2> Type of values for the right collection.
+     */
+    public static <K, V1, V2> TemporalInnerJoin<K, V1, V2> with(
+        PCollection<KV<K, V2>> rightCollection,
+        Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      return new TemporalInnerJoin<>(rightCollection, temporalBound, 
compareFn);
+    }
+
+    @Override
+    public PCollection<KV<K, KV<V1, V2>>> expand(PCollection<KV<K, V1>> 
leftCollection) {
+      // left        right
+      // tag-left    tag-right (create union type)
+      //   \         /
+      //     flatten
+      //     join
+
+      Coder<K> keyCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getKeyCoder();
+      Coder<V1> leftValueCoder = ((KvCoder<K, V1>) 
leftCollection.getCoder()).getValueCoder();
+      Coder<V2> rightValueCoder = ((KvCoder<K, V2>) 
rightCollection.getCoder()).getValueCoder();
+
+      PCollection<KV<K, KV<V1, V2>>> leftUnion =
+          leftCollection
+              .apply("LeftUnionTag", MapElements.via(new LeftUnionTagFn<K, V1, 
V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      PCollection<KV<K, KV<V1, V2>>> rightUnion =
+          rightCollection
+              .apply("RightUnionTag", MapElements.via(new RightUnionTagFn<K, 
V1, V2>()))
+              .setCoder(
+                  KvCoder.of(
+                      keyCoder,
+                      KvCoder.of(
+                          NullableCoder.of(leftValueCoder), 
NullableCoder.of(rightValueCoder))));
+
+      return PCollectionList.of(leftUnion)
+          .and(rightUnion)
+          .apply(Flatten.pCollections())
+          .apply(
+              "TemporalInnerJoinFn",
+              ParDo.of(
+                  new TemporalInnerJoinFn<>(
+                      leftValueCoder, rightValueCoder, temporalBound, 
comparatorFn)));
+    }
+  }
+
+  private static class LeftUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V1>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V1> element) {
+      return KV.of(element.getKey(), KV.of(element.getValue(), null));
+    }
+  }
+
+  private static class RightUnionTagFn<K, V1, V2>
+      extends SimpleFunction<KV<K, V2>, KV<K, KV<V1, V2>>> {
+    @Override
+    public KV<K, KV<V1, V2>> apply(KV<K, V2> element) {
+      return KV.of(element.getKey(), KV.of(null, element.getValue()));
+    }
+  }
+
+  private static class TemporalInnerJoinFn<K, V1, V2>
+      extends DoFn<KV<K, KV<V1, V2>>, KV<K, KV<V1, V2>>> {
+
+    @StateId("left")
+    private final StateSpec<OrderedListState<V1>> leftStateSpec;
+
+    @StateId("right")
+    private final StateSpec<OrderedListState<V2>> rightStateSpec;
+
+    // Null only when uninitialized. After first element is received this will 
always be non-null.
+    @StateId("lastEviction")
+    private final StateSpec<ValueState<Instant>> lastEvictionStateSpec;
+
+    @TimerId("eviction")
+    private final TimerSpec evictionSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final Duration temporalBound;
+    private final Duration evictionFrequency;
+    private final SimpleFunction<KV<V1, V2>, Boolean> compareFn;
+
+    // Tracks the state of the eviction timer. Value is true when the timer 
has been set and
+    // execution is waiting for the event time watermark to fire the timer 
according to the
+    // evictionFrequency. False after the timer has been fired, so 
processElement can set the timer
+    // using the previous firing event time.
+    private transient boolean evictionTimerSet;
+
+    @Setup
+    public void setup() {
+      evictionTimerSet = false;
+    }
+
+    protected TemporalInnerJoinFn(
+        final Coder<V1> leftCoder,
+        final Coder<V2> rightCoder,
+        final Duration temporalBound,
+        SimpleFunction<KV<V1, V2>, Boolean> compareFn) {
+      this.leftStateSpec = StateSpecs.orderedList(leftCoder);
+      this.rightStateSpec = StateSpecs.orderedList(rightCoder);
+      this.lastEvictionStateSpec = StateSpecs.value(InstantCoder.of());
+      this.temporalBound = temporalBound;
+      this.compareFn = compareFn;
+      this.evictionFrequency =
+          temporalBound.getMillis() <= 4 ? Duration.millis(1) : 
temporalBound.dividedBy(4);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @AlwaysFetched @StateId("left") OrderedListState<V1> leftState,
+        @AlwaysFetched @StateId("right") OrderedListState<V2> rightState,
+        @AlwaysFetched @StateId("lastEviction") ValueState<Instant> 
lastEvictionState,
+        @Timestamp Instant timestamp,
+        @TimerId("eviction") Timer evictionTimer) {
+      Instant lastEviction = lastEvictionState.read();
+      if (lastEviction == null) {
+        // Initialize timer for the first time relatively since event time 
watermark is unknown.
+        evictionTimerSet = true;
+        evictionTimer.offset(evictionFrequency).setRelative();
+      } else if (!evictionTimerSet) {
+        // Set timer using persisted event watermark from last timer firing 
event time.
+        checkNotNull(lastEviction);
+        evictionTimerSet = true;
+        evictionTimer.set(lastEviction.plus(evictionFrequency));
+      }
+
+      KV<K, KV<V1, V2>> e = c.element();
+      K key = e.getKey();
+      V1 left = e.getValue().getKey();
+      V2 right = e.getValue().getValue();
+      if (left != null) {
+        leftState.add(TimestampedValue.of(left, timestamp));
+        rightState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                r -> {
+                  KV<V1, V2> matchCandidate = KV.of(left, r.getValue());
+                  if (new Duration(r.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      } else {
+        rightState.add(TimestampedValue.of(right, timestamp));
+        leftState
+            .readRange(timestamp.minus(temporalBound), 
timestamp.plus(temporalBound))
+            .forEach(
+                l -> {
+                  KV<V1, V2> matchCandidate = KV.of(l.getValue(), right);
+                  if (new Duration(l.getTimestamp(), 
timestamp).abs().isShorterThan(temporalBound)
+                      && compareFn.apply(matchCandidate)) {
+                    c.output(KV.of(key, matchCandidate));
+                  }
+                });
+      }
+    }
+
+    @OnTimer("eviction")
+    public void onEviction(
+        @StateId("left") OrderedListState<V1> leftState,
+        @StateId("right") OrderedListState<V2> rightState,
+        @StateId("lastEviction") ValueState<Instant> lastEvictionState,
+        @Timestamp Instant ts) {
+      evictionTimerSet = false;
+      lastEvictionState.write(ts);
+      leftState.clearRange(new Instant(0L), ts);

Review comment:
       Didn't know that, thanks. Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to