dilnazanlid commented on code in PR #31379:
URL: https://github.com/apache/beam/pull/31379#discussion_r1636107571


##########
sdks/java/extensions/histogram/src/main/java/org/apache/beam/sdk/extensions/histogram/Histogram.java:
##########
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.histogram;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.VarInt;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
+import org.apache.commons.lang3.ArrayUtils;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * A histogram transform with a combiner that efficiently constructs linear, 
exponential or explicit
+ * histograms from large datasets of input data. Bucket bounds can be 
specified using the {@link
+ * BucketBounds} class.
+ */
+public class Histogram {
+
+  private Histogram() {
+    // do not instantiate
+  }
+
+  /**
+   * Returns a {@code PTransform} that takes a {@code PCollection<T>} and 
returns a {@code
+   * PCollection<List<Long>>} with a single element per window. The values of 
this list represent
+   * the number of elements within each bucket of a histogram, as defined by 
{@link BucketBounds}.
+   * The first and the last elements of the list are numbers of elements in 
underflow and overflow
+   * buckets.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<Double> pc = ...;
+   * PCollection<List<Long>> bucketCounts =
+   *     pc.apply(Histogram.globally(BucketBounds.linear(1.0, 2.0, 100)));
+   *
+   * }</pre>
+   *
+   * @param <T> the type of the elements in the input {@code PCollection}
+   * @param bucketBounds the instance of the {@link BucketBounds} class with 
desired parameters of
+   *     the histogram.
+   */
+  public static <T extends Number> Combine.Globally<T, List<Long>> globally(
+      BucketBounds bucketBounds) {
+    return Combine.globally(HistogramCombineFn.create(bucketBounds));
+  }
+
+  /**
+   * Returns a {@code PTransform} that takes a {@code PCollection<KV<K, V>>} 
and returns a {@code
+   * PCollection<KV<K, List<Long>>>} that contains an output element mapping 
each distinct key in
+   * the input {@code PCollection} to a {@code List}. The values of this list 
represent the number
+   * of elements within each bucket of a histogram, as defined by {@link 
BucketBounds}. The first
+   * and the last elements of the list are numbers of elements in underflow 
and overflow buckets.
+   *
+   * <p>Example of use:
+   *
+   * <pre>{@code
+   * PCollection<KV<String, Integer>> pc = ...;
+   * PCollection<KV<String, List<Long>>> bucketCounts =
+   *     pc.apply(Histogram.perKey(BucketBounds.linear(1.0, 2.0, 100)));
+   *
+   * }</pre>
+   *
+   * @param <K> the type of the keys in the input and output {@code 
PCollection}s
+   * @param <V> the type of the values in the input {@code PCollection}
+   * @param bucketBounds the instance of the {@link BucketBounds} class with 
desired parameters of
+   *     the histogram.
+   */
+  public static <K, V extends Number> Combine.PerKey<K, V, List<Long>> perKey(
+      BucketBounds bucketBounds) {
+    return Combine.perKey(HistogramCombineFn.create(bucketBounds));
+  }
+
+  /**
+   * Defines the bounds for histogram buckets.
+   *
+   * <p>Use the provided static factory methods to create new instances of 
{@link BucketBounds}.
+   */
+  @AutoValue
+  public abstract static class BucketBounds {
+
+    // Package-private because users should use static factory methods to 
instantiate new instances.
+    BucketBounds() {}
+
+    public abstract List<Double> getBounds();
+
+    public abstract BoundsInclusivity getBoundsInclusivity();
+
+    /**
+     * Static factory method for defining bounds of exponential histograms and 
calculating bounds
+     * based on the parameters.
+     *
+     * <p>For BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, 
the list that the
+     * HistogramCombineFn combiner returns contains the number of elements in 
the following buckets:
+     *
+     * <pre>
+     * 0-th: (-inf, scale) - underflow bucket
+     * 1-st: [scale, scale * growthFactor)
+     * 2-nd: [scale * growthFactor, scale * growthFactor^2)
+     * ...
+     * i-th: [scale * growthFactor^(i-1), scale * growthFactor^i)
+     * ...
+     * numBoundedBuckets: [scale * growthFactor^(numBoundedBuckets-1), scale *
+     * growthFactor^numBoundedBuckets)
+     * numBoundedBuckets + 1: [scale * growthFactor^numBoundedBuckets), +inf) 
- overflow bucket.
+     * </pre>
+     *
+     * <p>For BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, 
the list that the
+     * HistogramCombineFn combiner returns contains the number of elements in 
the following buckets:
+     *
+     * <pre>
+     * 0-th: (-inf, scale] - underflow bucket
+     * 1-st: (scale, scale * growthFactor]
+     * 2-nd: (scale * growthFactor, scale * growthFactor^2]
+     * ...
+     * i-th: (scale * growthFactor^(i-1), scale * growthFactor^i]
+     * ...
+     * numBoundedBuckets: (scale * growthFactor^(numBoundedBuckets-1), scale *
+     * growthFactor^numBoundedBuckets]
+     * numBoundedBuckets + 1: (scale * growthFactor^numBoundedBuckets), +inf) 
- overflow bucket.
+     * </pre>
+     *
+     * @param scale the value of the lower bound for the first bounded bucket.
+     * @param growthFactor value by which the bucket bounds are exponentially 
increased.
+     * @param numBoundedBuckets integer determining the total number of 
bounded buckets within the
+     *     histogram.
+     * @param boundsInclusivity enum value which defines if lower or upper 
bounds are
+     *     inclusive/exclusive.
+     */
+    public static BucketBounds exponential(
+        double scale,
+        double growthFactor,
+        int numBoundedBuckets,
+        BoundsInclusivity boundsInclusivity) {
+      checkArgument(scale > 0.0, "scale should be positive.");
+      checkArgument(growthFactor > 1.0, "growth factor should be greater than 
1.0.");
+      checkArgument(
+          numBoundedBuckets > 0, "number of bounded buckets should be greater 
than zero.");
+      checkArgument(
+          numBoundedBuckets <= Integer.MAX_VALUE - 2,
+          "number of bounded buckets should be less than max value of 
integer.");
+
+      ImmutableList.Builder<Double> boundsCalculated = new 
ImmutableList.Builder<>();
+      // The number of bounds is equal to the numBoundedBuckets + 1.
+      for (int i = 0; i <= numBoundedBuckets; i++) {
+        double bound = scale * Math.pow(growthFactor, i);
+        if (Double.isInfinite(bound)) {
+          throw new IllegalArgumentException("the bound has overflown double 
type.");
+        }
+        boundsCalculated.add(bound);
+      }
+
+      return new AutoValue_Histogram_BucketBounds(boundsCalculated.build(), 
boundsInclusivity);
+    }
+
+    /**
+     * Like {@link #exponential(double, double, int, BoundsInclusivity)}, but 
sets
+     * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for 
the boundsInclusivity
+     * parameter.
+     */
+    public static BucketBounds exponential(
+        double scale, double growthFactor, int numBoundedBuckets) {
+      return exponential(
+          scale,
+          growthFactor,
+          numBoundedBuckets,
+          BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE);
+    }
+
+    /**
+     * Static factory method for defining bounds of linear histogram and 
calculating bounds based on
+     * the parameters.
+     *
+     * @param offset value of the lower bound for the first bounded bucket.
+     * @param width bucket width.
+     * @param numBoundedBuckets integer determining the total number of 
bounded buckets within the
+     *     histogram.
+     * @param boundsInclusivity enum value which defines if lower or upper 
bounds are
+     *     inclusive/exclusive.
+     */
+    public static BucketBounds linear(
+        double offset, double width, int numBoundedBuckets, BoundsInclusivity 
boundsInclusivity) {
+      checkArgument(width > 0.0, "width of buckets should be positive.");
+      checkArgument(numBoundedBuckets > 0, "number of bounded buckets should 
be more than zero.");
+      checkArgument(
+          numBoundedBuckets <= Integer.MAX_VALUE - 2,
+          "number of bounded buckets should be less than max value of 
integer.");
+
+      ImmutableList.Builder<Double> boundsCalculated = new 
ImmutableList.Builder<>();
+      // The number of bounds is equal to the numBoundedBuckets + 1.
+      for (int i = 0; i <= numBoundedBuckets; i++) {
+        double bound = offset + i * width;
+        if (Double.isInfinite(bound)) {
+          throw new IllegalArgumentException("the bound has overflown double 
type.");
+        }
+        boundsCalculated.add(bound);
+      }

Review Comment:
   I feel like it can certainly be done, but this optimization may take more 
time and effort to research this topic, which is not directly in the scope of 
the combiner. It can be a TODO item for future optimizations. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to