kennknowles commented on a change in pull request #15275:
URL: https://github.com/apache/beam/pull/15275#discussion_r726554652



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on 
two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the 
difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue
+public abstract class EventTimeEquiJoin<K, V1, V2>
+    extends PTransform<PCollection<KV<K, V1>>, PCollection<KV<K, Pair<V1, 
V2>>>> {
+
+  /* Where the output timestamp for each element is taken from. */
+  public enum OutputTimestampFrom {
+    FIRST_COLLECTION,
+    SECOND_COLLECTION,
+    MINIMUM_TIMESTAMP,
+    MAXIMUM_TIMESTAMP
+  }
+
+  /**
+   * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} 
inner join on two
+   * PCollections.
+   */
+  public static <K, V1, V2> EventTimeEquiJoin<K, V1, V2> innerJoin(
+      PCollection<KV<K, V2>> secondCollection) {
+    return new AutoValue_EventTimeEquiJoin.Builder<K, V1, V2>()
+        .setSecondCollection(secondCollection)
+        .setFirstCollectionValidFor(Duration.ZERO)
+        .setSecondCollectionValidFor(Duration.ZERO)
+        .setAllowedLateness(Duration.ZERO)
+        .setOutputTimestampFrom(OutputTimestampFrom.MINIMUM_TIMESTAMP)
+        .build();
+  }
+
+  abstract PCollection<KV<K, V2>> getSecondCollection();
+
+  abstract Duration getFirstCollectionValidFor();
+
+  abstract Duration getSecondCollectionValidFor();
+
+  abstract Duration getAllowedLateness();
+
+  abstract OutputTimestampFrom getOutputTimestampFrom();
+
+  abstract Builder<K, V1, V2> toBuilder();
+
+  @AutoValue.Builder
+  public abstract static class Builder<K, V1, V2> {
+    public abstract Builder<K, V1, V2> setSecondCollection(PCollection<KV<K, 
V2>> value);
+
+    public abstract Builder<K, V1, V2> setFirstCollectionValidFor(Duration 
value);
+
+    public abstract Builder<K, V1, V2> setSecondCollectionValidFor(Duration 
value);
+
+    public abstract Builder<K, V1, V2> setAllowedLateness(Duration value);
+
+    public abstract Builder<K, V1, V2> setOutputTimestampFrom(
+        OutputTimestampFrom outputTimestampFrom);
+
+    abstract EventTimeEquiJoin<K, V1, V2> build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches 
elements from the first and
+   * second collection with the same keys if their timestamps are within the 
given interval.
+   *
+   * @param interval the allowed difference between the timestamps to allow a 
match
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(Duration interval) {
+    return toBuilder()
+        .setFirstCollectionValidFor(interval)
+        .setSecondCollectionValidFor(interval)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches 
elements from the first and
+   * second collection with the same keys if the collection's element comes 
within the valid time
+   * range for the other collection.
+   *
+   * @param firstCollectionValidFor the valid time range for the first 
collection
+   * @param secondCollectionValidFor the valid time range for the second 
collection
+   */
+  public EventTimeEquiJoin<K, V1, V2> within(
+      Duration firstCollectionValidFor, Duration secondCollectionValidFor) {
+    return toBuilder()
+        .setFirstCollectionValidFor(firstCollectionValidFor)
+        .setSecondCollectionValidFor(secondCollectionValidFor)
+        .build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches 
elements from the first and
+   * second collection with the same keys and allows for late elements.
+   *
+   * @param allowedLateness the amount of time late elements are allowed.
+   */
+  public EventTimeEquiJoin<K, V1, V2> withAllowedLateness(Duration 
allowedLateness) {
+    checkArgument(
+        allowedLateness.isLongerThan(Duration.ZERO),
+        "Allowed lateness for EventTimeEquiJoin must be positive.");
+    return toBuilder().setAllowedLateness(allowedLateness).build();
+  }
+
+  /**
+   * Returns a {@code Impl<K, V1, V2>} {@code PTransform} that matches 
elements from the first and
+   * second collection with the same keys and allows for late elements.
+   *
+   * @param outputTimestampFrom where to pull the output timestamp from
+   */
+  public EventTimeEquiJoin<K, V1, V2> withOutputTimestampFrom(
+      OutputTimestampFrom outputTimestampFrom) {
+    return toBuilder().setOutputTimestampFrom(outputTimestampFrom).build();
+  }
+
+  @Override
+  public PCollection<KV<K, Pair<V1, V2>>> expand(PCollection<KV<K, V1>> input) 
{
+    Coder<K> keyCoder = JoinUtils.getKeyCoder(input);
+    Coder<V1> firstValueCoder = JoinUtils.getValueCoder(input);
+    Coder<V2> secondValueCoder = 
JoinUtils.getValueCoder(getSecondCollection());
+    UnionCoder unionCoder = UnionCoder.of(ImmutableList.of(firstValueCoder, 
secondValueCoder));
+    KvCoder<K, RawUnionValue> kvCoder = 
KvCoder.of(JoinUtils.getKeyCoder(input), unionCoder);
+    PCollectionList<KV<K, RawUnionValue>> union =
+        PCollectionList.of(JoinUtils.makeUnionTable(0, input, kvCoder))
+            .and(JoinUtils.makeUnionTable(1, getSecondCollection(), kvCoder));
+    return union
+        .apply("Flatten", Flatten.pCollections())
+        .apply(
+            "Join",
+            ParDo.of(
+                new EventTimeEquiJoinDoFn<>(
+                    firstValueCoder,
+                    secondValueCoder,
+                    getFirstCollectionValidFor(),
+                    getSecondCollectionValidFor(),
+                    getAllowedLateness(),
+                    getOutputTimestampFrom())))
+        .setCoder(KvCoder.of(keyCoder, PairCoder.<V1, V2>of(firstValueCoder, 
secondValueCoder)));
+  }
+
+  private static class EventTimeEquiJoinDoFn<K, V1, V2>
+      extends DoFn<KV<K, RawUnionValue>, KV<K, Pair<V1, V2>>> {
+    private static final int FIRST_TAG = 0;
+    private static final int SECOND_TAG = 1;
+
+    // Bucket cleanup timers into TIMER_BUCKET length buckets.
+    private static final long TIMER_BUCKET = 
Duration.standardMinutes(1).getMillis();
+
+    // How long elements in the first and second collection are valid (can be 
matched) for.
+    private final Duration firstCollectionValidFor;
+    private final Duration secondCollectionValidFor;
+
+    // How long past the watermark that late elements can show up.
+    private final Duration allowedLateness;
+
+    // How to generate the output timestamp.
+    private final OutputTimestampFrom outputTimestampFrom;
+
+    @StateId("firstItems")
+    private final StateSpec<OrderedListState<V1>> firstCollectionItems;
+
+    @StateId("secondItems")
+    private final StateSpec<OrderedListState<V2>> secondCollectionItems;
+
+    // Timestamp of the oldest element that has not already been cleaned up 
used to ensure we don't
+    // accept elements for timestamps we already cleaned up.
+    @StateId("oldestFirstTimestamp")
+    private final StateSpec<ValueState<Instant>> oldestFirstTimestamp;
+
+    @StateId("oldestSecondTimestamp")
+    private final StateSpec<ValueState<Instant>> oldestSecondTimestamp;
+
+    @TimerFamily("cleanupTimers")
+    private final TimerSpec cleanupTimers = 
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Watermark holds for elements in the first collection.
+    @TimerFamily("firstCollectionHolds")
+    private final TimerSpec firstCollectionHolds = 
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    // Watermark holds for elements in the second collection.
+    @TimerFamily("secondCollectionHolds")
+    private final TimerSpec secondCollectionHolds = 
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
+
+    public EventTimeEquiJoinDoFn(
+        Coder<V1> firstValueCoder,
+        Coder<V2> secondValueCoder,
+        Duration firstValidFor,
+        Duration secondValidFor,
+        Duration allowedLateness,
+        OutputTimestampFrom outputTimestampFrom) {
+      this.firstCollectionValidFor = firstValidFor;
+      this.secondCollectionValidFor = secondValidFor;
+      this.allowedLateness = allowedLateness;
+      this.outputTimestampFrom = outputTimestampFrom;
+      firstCollectionItems = StateSpecs.orderedList(firstValueCoder);
+      secondCollectionItems = StateSpecs.orderedList(secondValueCoder);
+      oldestFirstTimestamp = StateSpecs.value();
+      oldestSecondTimestamp = StateSpecs.value();
+    }
+
+    @FunctionalInterface
+    private interface Output<T1, T2> {
+      void output(T1 one, T2 two, Instant tsOne, Instant tsTwo);
+    }
+
+    /** Adds a timer at the next bucket past time to fire at the bucket 
boundary. */
+    private Timer addTimer(TimerMap timers, Instant time) {
+      Instant nextBucketStart =
+          Instant.ofEpochMilli(time.getMillis() / TIMER_BUCKET * TIMER_BUCKET 
+ TIMER_BUCKET);
+      Timer timer = timers.get(Long.toString(nextBucketStart.getMillis()));
+      timer.set(nextBucketStart);
+      return timer;
+    }
+
+    private <ThisT, OtherT> void processHelper(
+        Output<ThisT, OtherT> output,
+        KV<K, RawUnionValue> element,
+        Instant ts,
+        OrderedListState<ThisT> thisCollection,
+        OrderedListState<OtherT> otherCollection,
+        ValueState<Instant> oldestTimestampState,

Review comment:
       I can see how having a specific timestamp queue state would make it 
easier to have all the methods we need in a single "state type". Interestingly 
enough, the original design for state (before all the annotations) made it 
pretty easy to define composite types of state, whereas now we really cannot.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/PairCoder.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** A {@link Coder} for {@link Pair}s that defers to underlying coders. */
+public class PairCoder<V1, V2> extends StructuredCoder<Pair<V1, V2>> {
+  private final Coder<V1> firstCoder;
+  private final Coder<V2> secondCoder;
+
+  private PairCoder(Coder<V1> firstCoder, Coder<V2> secondCoder) {
+    this.firstCoder = firstCoder;
+    this.secondCoder = secondCoder;
+  }
+
+  /** Returns a {@link PairCoder} for the given underlying value coders. */
+  public static <V1, V2> PairCoder<V1, V2> of(Coder<V1> firstCoder, Coder<V2> 
secondCoder) {
+    return new PairCoder<>(firstCoder, secondCoder);
+  }
+
+  @Override
+  public void encode(Pair<V1, V2> value, OutputStream outStream)
+      throws CoderException, IOException {
+    firstCoder.encode(value.getFirst(), outStream);

Review comment:
       Have we deprecated the coder "context" idea? Or do you just not want to 
apply it here? I would expect PairCoder to be identical to KvCoder, anyhow.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/EventTimeEquiJoin.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.join;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.state.OrderedListState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Returns a {@link PTransform} that performs a {@link EventTimeEquiJoin} on 
two PCollections. A
+ * {@link EventTimeEquiJoin} joins elements with equal keys bounded by the 
difference in event time.
+ * Currently only inner join is supported.
+ *
+ * <p>Example of performing a {@link EventTimeEquiJoin}:
+ *
+ * <pre>{@code
+ * PCollection<KV<K, V1>> pt1 = ...;
+ * PCollection<KV<K, V2>> pt2 = ...;
+ *
+ * PCollection<KV<K, Pair<V1, V2>> eventTimeEquiJoinCollection =
+ *   pt1.apply(EventTimeEquiJoin.<K, V1, V2>of(pt2));
+ *
+ * @param <K> the type of the keys in the input {@code PCollection}s
+ * @param <V1> the type of the value in the first {@code PCollection}
+ * @param <V2> the type of the value in the second {@code PCollection}
+ * </pre>
+ */
+@AutoValue

Review comment:
       `@Experimental`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to