reuvenlax commented on a change in pull request #15786:
URL: https://github.com/apache/beam/pull/15786#discussion_r771792474



##########
File path: 
sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/FillGaps.java
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.timeseries;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.SortedMapCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.transforms.WithKeys;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.TimerMap;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Fill gaps in timeseries. Values are expected to have Beam schemas 
registered.
+ *
+ * <p>This transform views the original PCollection as a collection of 
timeseries, each with a different key. They
+ * key to be used and the timeseries bucket size are both specified in the 
{@link #of} creation method. Multiple
+ * fields can be specified for the key - the key extracted will be a composite 
of all of them. Any elements in the
+ * original {@link PCollection} will appear unchanged in the output 
PCollection, with timestamp and window unchanged.
+ * Any gaps in timeseries (i.e. buckets with no elements) will be filled in 
the output PCollection with a single element
+ * (by default the latest element seen or propagated into the previous 
bucket). The timestamp of the filled element is
+ * the end of the bucket, and the original PCollection's window function is 
used to assign it to a window.
+ *
+ *
+ * <p>Example usage: the following code views each user,country pair in the 
input {@link PCollection} as a timeseries
+ * with bucket size one second. If any of these timeseries has a bucket with 
no elements, then the latest element from
+ * the previous bucket (i.e. the one with the largest timestamp) wil be 
propagated into the missing bucket. If there
+ * are multiple missing buckets, then they all will be filled up to 1 hour - 
the maximum gap size specified in
+ * {@link #withMaxGapFillBuckets}.
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withMaxGapFillBuckets(3600L)));
+ *  gapFilled.apply(MySink.create());
+ *     }</pre>
+ *
+ * <p>By default, the latest element from the previous bucket is propagated 
into missing buckets. The user can override
+ * this using the {@link #withMergeFunction} method. Several built-in merge 
functions are provided for -
+ * {@link #keepLatest()} (the default), {@link #keepEarliest()}, an {@link 
#keepNull()}.
+ *
+ * <p>Sometimes elements need to be modified before being propagated into a 
missing bucket. For example, consider the
+ * following element type containing a timestamp:
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * class MyType {
+ *   MyData data;
+ *   Instant timestamp;
+ *   @SchemaCreate
+ *   MyType(MyData data, Instant timestamp) {
+ *       this.data = data;
+ *       this.timestamp - timestamp;
+ *   }
+ * })</pre>
+ *
+ * The element timestamps should always be contained in its current timeseries 
bucket, so the element needs to be
+ * modified when propagated to a new bucket. This can be done using the {@link 
#withPropagateFunction}} method, as
+ * follows:
+ *
+ * <pre>{@code PCollection<MyType> input = readInput();
+ * PCollection<MyType> gapFilled =
+ *   input.apply("fillGaps",
+ *      FillGaps.of(Duration.standardSeconds(1), "userId", "country")
+ *        .withPropagateFunction(p -> new MyType(p.getValue().getValue().data, 
p.getNextWindow().maxTimestamp()))
+ *        .withMaxGapFillBuckets(360L)));
+ *  gapFilled.apply(MySink.create());
+ *  }</pre>
+ */
+@AutoValue
+public abstract class FillGaps<ValueT>
+    extends PTransform<PCollection<ValueT>, PCollection<ValueT>> {
+  /**
+   * Argument to {@link #withMergeFunction}. Always propagates the element 
with the latest
+   * timestamp.
+   */
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, 
TimestampedValue<ValueT>>
+          keepLatest() {
+    return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v1 : v2;
+  }
+
+  /**
+   * Argument to {@link #withMergeFunction}. Always propagates the element 
with the earliest
+   * timestamp.
+   */
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, 
TimestampedValue<ValueT>>
+          keepEarliest() {
+    return (v1, v2) -> v1.getTimestamp().isAfter(v2.getTimestamp()) ? v2 : v1;
+  }
+
+  /** Argument to {@link #withMergeFunction}. Always propagates a null value. 
*/
+  @SuppressWarnings({"nullness"})
+  public static <ValueT>
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, 
TimestampedValue<ValueT>>
+          keepNull() {
+    return (v1, v2) -> null;
+  }
+
+  /** Argument to withPropagateFunction function. */
+  @AutoValue
+  public abstract static class PropagateData<ValueT> {
+    public abstract TimestampedValue<ValueT> getValue();
+
+    public abstract BoundedWindow getPreviousWindow();
+
+    public abstract BoundedWindow getNextWindow();
+  }
+
+  abstract Duration getTimeseriesBucketDuration();
+
+  abstract Long getMaxGapFillBuckets();
+
+  abstract Instant getStopTime();
+
+  abstract FieldAccessDescriptor getKeyDescriptor();
+
+  abstract SerializableBiFunction<
+          TimestampedValue<ValueT>, TimestampedValue<ValueT>, 
TimestampedValue<ValueT>>
+      getMergeValues();
+
+  @Nullable
+  abstract SerializableFunction<PropagateData<ValueT>, ValueT> 
getPropagateFunction();
+
+  abstract Builder<ValueT> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<ValueT> {
+    abstract Builder<ValueT> setTimeseriesBucketDuration(Duration value);
+
+    abstract Builder<ValueT> setMaxGapFillBuckets(Long value);
+
+    abstract Builder<ValueT> setStopTime(Instant value);
+
+    abstract Builder<ValueT> setKeyDescriptor(FieldAccessDescriptor 
keyDescriptor);
+
+    abstract Builder<ValueT> setMergeValues(
+        SerializableBiFunction<
+                TimestampedValue<ValueT>, TimestampedValue<ValueT>, 
TimestampedValue<ValueT>>
+            mergeValues);
+
+    abstract Builder<ValueT> setPropagateFunction(
+        @Nullable SerializableFunction<PropagateData<ValueT>, ValueT> 
propagateFunction);
+
+    abstract FillGaps<ValueT> build();
+  }
+
+  /** Construct the transform for the given duration and key fields. */
+  public static <ValueT> FillGaps<ValueT> of(Duration windowDuration, 
String... keys) {
+    return of(windowDuration, FieldAccessDescriptor.withFieldNames(keys));
+  }
+
+  /** Construct the transform for the given duration and key fields. */
+  public static <ValueT> FillGaps<ValueT> of(
+      Duration windowDuration, FieldAccessDescriptor keyDescriptor) {
+    return new AutoValue_FillGaps.Builder<ValueT>()
+        .setTimeseriesBucketDuration(windowDuration)
+        .setMaxGapFillBuckets(Long.MAX_VALUE)
+        .setStopTime(BoundedWindow.TIMESTAMP_MAX_VALUE)
+        .setKeyDescriptor(keyDescriptor)
+        .setMergeValues(keepLatest())
+        .build();
+  }
+
+  /* The max gap duration that will be filled. The transform will stop filling 
timeseries buckets after this duration. */
+  FillGaps<ValueT> withMaxGapFillBuckets(Long value) {
+    return toBuilder().setMaxGapFillBuckets(value).build();
+  }
+
+  /* A hard (event-time) stop time for the transform. */
+  FillGaps<ValueT> withStopTime(Instant stopTime) {
+    return toBuilder().setStopTime(stopTime).build();
+  }
+
+  /**
+   * If there are multiple values in a single timeseries bucket, this function 
is used to specify
+   * what to propagate to the next bucket. If not specified, then the value 
with the latest
+   * timestamp will be propagated.
+   */
+  FillGaps<ValueT> withMergeFunction(
+      SerializableBiFunction<
+              TimestampedValue<ValueT>, TimestampedValue<ValueT>, 
TimestampedValue<ValueT>>
+          mergeFunction) {
+    return toBuilder().setMergeValues(mergeFunction).build();
+  }
+
+  /**

Review comment:
       you mean the timestamp in the record itself, not the Beam timestamp? IMO 
seems premature to add this helper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to