This is an automated email from the ASF dual-hosted git repository. rarokni pushed a commit to branch revert-12973-hll in repository https://gitbox.apache.org/repos/asf/beam.git
commit c7d80a855ea7b1897ab24b2d4a1db126c0ddbc7f Author: Reza Rokni <[email protected]> AuthorDate: Tue Jan 5 15:25:31 2021 +0800 Revert "[BEAM-10234] Create ApproximateDistinct using HLL Impl" --- .../beam/sdk/transforms/ApproximateUnique.java | 20 +- .../zetasketch/ApproximateCountDistinct.java | 288 ----------------- .../zetasketch/ApproximateCountDistinctTest.java | 342 --------------------- 3 files changed, 9 insertions(+), 641 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java index 760883a..c943084 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java @@ -43,22 +43,20 @@ import org.checkerframework.checker.nullness.qual.Nullable; * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s. * - * @deprecated - * <p>Consider using {@code ApproximateCountDistinct} in the {@code zetasketch} extension - * module, which makes use of the {@code HllCount} implementation. - * <p>If {@code ApproximateCountDistinct} does not meet your needs then you can directly use - * {@code HllCount}. Direct usage will also give you access to save intermediate aggregation - * result into a sketch for later processing. - * <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}: - * <pre>{@code + * <p>Consider using {@code HllCount} in the {@code zetasketch} extension module if you need better + * performance or need to save intermediate aggregation result into a sketch for later processing. + * + * <p>For example, to estimate the number of distinct elements in a {@code PCollection<String>}: + * + * <pre>{@code * PCollection<String> input = ...; * PCollection<Long> countDistinct = * input.apply(HllCount.Init.forStrings().globally()).apply(HllCount.Extract.globally()); * }</pre> - * For more details about using {@code HllCount} and the {@code zetasketch} extension module, - * see https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7. + * + * For more details about using {@code HllCount} and the {@code zetasketch} extension module, see + * https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7. */ -@Deprecated public class ApproximateUnique { /** diff --git a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java b/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java deleted file mode 100644 index 9b9daf5..0000000 --- a/sdks/java/extensions/zetasketch/src/main/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinct.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.zetasketch; - -import com.google.auto.value.AutoValue; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.extensions.zetasketch.HllCount.Init.Builder; -import org.apache.beam.sdk.transforms.Contextful; -import org.apache.beam.sdk.transforms.Contextful.Fn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ProcessFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@code PTransform}s for estimating the number of distinct elements in a {@code PCollection}, or - * the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s. - * - * <p>We make use of the {@link HllCount} implementation for this transform. Please use {@link - * HllCount} directly if you need access to the sketches. - * - * <p>If the object is not one of {@link Byte[]} {@link Integer} {@link Double} {@link String} make - * use of {@link Globally#via} or {@link PerKey#via} - * - * <h3>Examples</h3> - * - * <h4>Example 1: Approximate Count of Ints {@code PCollection<Integer>} and specify precision</h4> - * - * <pre>{@code - * p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.globally() - * .withPercision(PRECISION)); - * - * }</pre> - * - * <h4>Example 2: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4> - * - * <pre>{@code - * PCollection<KV<Integer, Long>> result = - * p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey()); - * - * }</pre> - * - * <h4>Example 3: Approximate Count of Key Value {@code PCollection<KV<Integer,Foo>>}</h4> - * - * <pre>{@code - * PCollection<KV<Integer, Foo>> approxResultInteger = - * p.apply("Int", Create.of(Foo)) - * .apply("IntHLL", ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey() - * .via(kv -> KV.of(kv.getKey(), (long) kv.getValue().hashCode()))); - * }</pre> - */ -@Experimental -public class ApproximateCountDistinct { - - private static final Logger LOG = LoggerFactory.getLogger(ApproximateCountDistinct.class); - - private static final List<TypeDescriptor<?>> HLL_IMPLEMENTED_TYPES = - ImmutableList.of( - TypeDescriptors.strings(), - TypeDescriptors.longs(), - TypeDescriptors.integers(), - new TypeDescriptor<byte[]>() {}); - - public static <T> Globally<T> globally() { - return new AutoValue_ApproximateCountDistinct_Globally.Builder<T>() - .setPrecision(HllCount.DEFAULT_PRECISION) - .build(); - } - - public static <K, V> PerKey<K, V> perKey() { - return new AutoValue_ApproximateCountDistinct_PerKey.Builder<K, V>() - .setPrecision(HllCount.DEFAULT_PRECISION) - .build(); - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * {@code PTransform} for estimating the number of distinct elements in a {@code PCollection}. - * - * @param <T> the type of the elements in the input {@code PCollection} - */ - @AutoValue - public abstract static class Globally<T> extends PTransform<PCollection<T>, PCollection<Long>> { - - public abstract int getPrecision(); - - public abstract Builder<T> toBuilder(); - - @Nullable - public abstract Contextful<Fn<T, Long>> getMapping(); - - @AutoValue.Builder - public abstract static class Builder<T> { - - public abstract Builder<T> setPrecision(int precision); - - public abstract Builder<T> setMapping(Contextful<Fn<T, Long>> value); - - public abstract Globally<T> build(); - } - - public Globally<T> via(ProcessFunction<T, Long> fn) { - - return toBuilder().setMapping(Contextful.<T, Long>fn(fn)).build(); - } - - public <V> Globally<V> withPercision(Integer withPercision) { - @SuppressWarnings("unchecked") - Globally<V> globally = (Globally<V>) toBuilder().setPrecision(withPercision).build(); - return globally; - } - - @Override - public PCollection<Long> expand(PCollection<T> input) { - - TypeDescriptor<T> type = input.getCoder().getEncodedTypeDescriptor(); - - if (HLL_IMPLEMENTED_TYPES.contains(type)) { - - HllCount.Init.Builder<T> builder = builderForType(type); - - return input.apply(builder.globally()).apply(HllCount.Extract.globally()); - } - - // Boiler plate to avoid [argument.type.incompatible] NonNull vs Nullable - Contextful<Fn<T, Long>> mapping = getMapping(); - - if (mapping != null) { - return input - .apply(MapElements.into(TypeDescriptors.longs()).via(mapping)) - .apply(HllCount.Init.forLongs().globally()) - .apply(HllCount.Extract.globally()); - } - - throw new IllegalArgumentException( - String.format( - "%s supports Integer," - + " Long, String and byte[] objects directly. For other types you must provide a Mapping function.", - this.getClass().getCanonicalName())); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - ApproximateCountDistinct.populateDisplayData(builder, getPrecision()); - } - } - - @AutoValue - public abstract static class PerKey<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> { - - public abstract Integer getPrecision(); - - @Nullable - public abstract Contextful<Fn<KV<K, V>, KV<K, Long>>> getMapping(); - - public abstract Builder<K, V> toBuilder(); - - @AutoValue.Builder - public abstract static class Builder<K, V> { - - public abstract Builder<K, V> setPrecision(Integer precision); - - public abstract Builder<K, V> setMapping(Contextful<Fn<KV<K, V>, KV<K, Long>>> value); - - public abstract PerKey<K, V> build(); - } - - public <K2, V2> PerKey<K2, V2> withPercision(Integer withPercision) { - // Work around for loss of type inference when using API. - @SuppressWarnings("unchecked") - PerKey<K2, V2> perKey = (PerKey<K2, V2>) this.toBuilder().setPrecision(withPercision).build(); - return perKey; - } - - public PerKey<K, V> via(ProcessFunction<KV<K, V>, KV<K, Long>> fn) { - - return this.toBuilder().setMapping(Contextful.<KV<K, V>, KV<K, Long>>fn(fn)).build(); - } - - @Override - public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) { - - Coder<V> coder = ((KvCoder<K, V>) input.getCoder()).getValueCoder(); - - TypeDescriptor<V> type = coder.getEncodedTypeDescriptor(); - - if (HLL_IMPLEMENTED_TYPES.contains(type)) { - - HllCount.Init.Builder<V> builder = builderForType(type); - - return input.apply(builder.perKey()).apply(HllCount.Extract.perKey()); - } - - // Boiler plate to avoid [argument.type.incompatible] NonNull vs Nullable - Contextful<Fn<KV<K, V>, KV<K, Long>>> mapping = getMapping(); - - if (mapping != null) { - Coder<K> keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder(); - return input - .apply( - MapElements.into( - TypeDescriptors.kvs( - keyCoder.getEncodedTypeDescriptor(), TypeDescriptors.longs())) - .via(mapping)) - .apply(HllCount.Init.forLongs().perKey()) - .apply(HllCount.Extract.perKey()); - } - - throw new IllegalArgumentException( - String.format( - "%s supports Integer," - + " Long, String and byte[] objects directly not for %s type, you must provide a Mapping use via.", - this.getClass().getCanonicalName(), type.toString())); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - ApproximateCountDistinct.populateDisplayData(builder, getPrecision()); - } - } - - ///////////////////////////////////////////////////////////////////////////// - - private static void populateDisplayData(DisplayData.Builder builder, Integer precision) { - builder.add(DisplayData.item("precision", precision).withLabel("Precision")); - } - - // HLLCount supports, Long, Integers, String and Byte primitives. - // We will return an appropriate builder - protected static <T> Builder<T> builderForType(TypeDescriptor<T> input) { - - @SuppressWarnings("rawtypes") - HllCount.Init.Builder builder = null; - - if (input.equals(TypeDescriptors.strings())) { - builder = HllCount.Init.forStrings(); - } - if (input.equals(TypeDescriptors.longs())) { - builder = HllCount.Init.forLongs(); - } - if (input.equals(TypeDescriptors.integers())) { - builder = HllCount.Init.forIntegers(); - } - if (input.equals(new TypeDescriptor<byte[]>() {})) { - builder = HllCount.Init.forBytes(); - } - - if (builder == null) { - throw new IllegalArgumentException(String.format("Type not supported %s", input)); - } - - // Safe to ignore warning, as we know the type based on the check we do above. - @SuppressWarnings("unchecked") - Builder<T> output = (Builder<T>) builder; - - return output; - } -} diff --git a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java b/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java deleted file mode 100644 index 8796d83..0000000 --- a/sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/ApproximateCountDistinctTest.java +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.zetasketch; - -import com.google.zetasketch.HyperLogLogPlusPlus; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** Tests for {@link ApproximateCountDistinct}. */ -public class ApproximateCountDistinctTest { - - @Rule public final transient TestPipeline p = TestPipeline.create(); - - // Integer - private static final List<Integer> INTS1 = Arrays.asList(1, 2, 3, 3, 1, 4); - private static final Long INTS1_ESTIMATE; - - private static final int TEST_PRECISION = 20; - - static { - HyperLogLogPlusPlus<Integer> hll = new HyperLogLogPlusPlus.Builder().buildForIntegers(); - INTS1.forEach(hll::add); - INTS1_ESTIMATE = hll.longResult(); - } - - /** Test correct Builder is returned from Generic type. * */ - @Test - public void testIntegerBuilder() { - - PCollection<Integer> ints = p.apply(Create.of(1)); - HllCount.Init.Builder<Integer> builder = - ApproximateCountDistinct.<Integer>builderForType( - ints.getCoder().getEncodedTypeDescriptor()); - PCollection<Long> result = ints.apply(builder.globally()).apply(HllCount.Extract.globally()); - PAssert.that(result).containsInAnyOrder(1L); - p.run(); - } - /** Test correct Builder is returned from Generic type. * */ - @Test - public void testStringBuilder() { - - PCollection<String> strings = p.apply(Create.<String>of("43")); - HllCount.Init.Builder<String> builder = - ApproximateCountDistinct.<String>builderForType( - strings.getCoder().getEncodedTypeDescriptor()); - PCollection<Long> result = strings.apply(builder.globally()).apply(HllCount.Extract.globally()); - PAssert.that(result).containsInAnyOrder(1L); - p.run(); - } - /** Test correct Builder is returned from Generic type. * */ - @Test - public void testLongBuilder() { - - PCollection<Long> longs = p.apply(Create.<Long>of(1L)); - HllCount.Init.Builder<Long> builder = - ApproximateCountDistinct.<Long>builderForType(longs.getCoder().getEncodedTypeDescriptor()); - PCollection<Long> result = longs.apply(builder.globally()).apply(HllCount.Extract.globally()); - PAssert.that(result).containsInAnyOrder(1L); - p.run(); - } - /** Test correct Builder is returned from Generic type. * */ - @Test - public void testBytesBuilder() { - - byte[] byteArray = new byte[] {'A'}; - PCollection<byte[]> bytes = p.apply(Create.of(byteArray)); - TypeDescriptor<byte[]> a = bytes.getCoder().getEncodedTypeDescriptor(); - HllCount.Init.Builder<byte[]> builder = - ApproximateCountDistinct.<byte[]>builderForType( - bytes.getCoder().getEncodedTypeDescriptor()); - PCollection<Long> result = bytes.apply(builder.globally()).apply(HllCount.Extract.globally()); - PAssert.that(result).containsInAnyOrder(1L); - p.run(); - } - - /** Test Integer Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesGlobalForInteger() { - PCollection<Long> approxResultInteger = - p.apply("Int", Create.of(INTS1)).apply("IntHLL", ApproximateCountDistinct.globally()); - PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE); - p.run(); - } - - /** Test Long Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesGlobalForLong() { - - PCollection<Long> approxResultLong = - p.apply("Long", Create.of(INTS1.stream().map(Long::valueOf).collect(Collectors.toList()))) - .apply("LongHLL", ApproximateCountDistinct.globally()); - - PAssert.thatSingleton(approxResultLong).isEqualTo(INTS1_ESTIMATE); - - p.run(); - } - - /** Test String Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesGlobalForStrings() { - PCollection<Long> approxResultString = - p.apply("Str", Create.of(INTS1.stream().map(String::valueOf).collect(Collectors.toList()))) - .apply("StrHLL", ApproximateCountDistinct.globally()); - - PAssert.thatSingleton(approxResultString).isEqualTo(INTS1_ESTIMATE); - - p.run(); - } - - /** Test Byte Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesGlobalForBytes() { - PCollection<Long> approxResultByte = - p.apply( - "BytesHLL", - Create.of( - INTS1.stream() - .map(x -> ByteBuffer.allocate(4).putInt(x).array()) - .collect(Collectors.toList()))) - .apply(ApproximateCountDistinct.globally()); - - PAssert.thatSingleton(approxResultByte).isEqualTo(INTS1_ESTIMATE); - - p.run(); - } - - /** Test Integer Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesPerKeyForInteger() { - - List<KV<Integer, Integer>> ints = new ArrayList<>(); - - for (int i = 0; i < 3; i++) { - for (int k : INTS1) { - ints.add(KV.of(i, k)); - } - } - - PCollection<KV<Integer, Long>> result = - p.apply("Int", Create.of(ints)).apply("IntHLL", ApproximateCountDistinct.perKey()); - - PAssert.that(result) - .containsInAnyOrder( - ImmutableList.of( - KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE))); - - p.run(); - } - - /** Test Long Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesPerKeyForLong() { - - List<KV<Integer, Long>> longs = new ArrayList<>(); - - for (int i = 0; i < 3; i++) { - for (int k : INTS1) { - longs.add(KV.of(i, (long) k)); - } - } - - PCollection<KV<Integer, Long>> result = - p.apply("Long", Create.of(longs)).apply("LongHLL", ApproximateCountDistinct.perKey()); - - PAssert.that(result) - .containsInAnyOrder( - ImmutableList.of( - KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE))); - - p.run(); - } - - /** Test String Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesPerKeyForStrings() { - List<KV<Integer, String>> strings = new ArrayList<>(); - - for (int i = 0; i < 3; i++) { - for (int k : INTS1) { - strings.add(KV.of(i, String.valueOf(k))); - } - } - - PCollection<KV<Integer, Long>> result = - p.apply("Str", Create.of(strings)).apply("StrHLL", ApproximateCountDistinct.perKey()); - - PAssert.that(result) - .containsInAnyOrder( - ImmutableList.of( - KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE))); - - p.run(); - } - - /** Test Byte Globally. */ - @Test - @Category(NeedsRunner.class) - public void testStandardTypesPerKeyForBytes() { - - List<KV<Integer, byte[]>> bytes = new ArrayList<>(); - - for (int i = 0; i < 3; i++) { - for (int k : INTS1) { - bytes.add(KV.of(i, ByteBuffer.allocate(4).putInt(k).array())); - } - } - - PCollection<KV<Integer, Long>> result = - p.apply("BytesHLL", Create.of(bytes)).apply(ApproximateCountDistinct.perKey()); - - PAssert.that(result) - .containsInAnyOrder( - ImmutableList.of( - KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE))); - - p.run(); - } - - /** Test a general object, we will make use of a KV as the object as it already has a coder. */ - @Test - @Category(NeedsRunner.class) - public void testObjectTypesGlobal() { - - PCollection<Long> approxResultInteger = - p.apply( - "Int", - Create.of( - INTS1.stream().map(x -> KV.of(x, KV.of(x, x))).collect(Collectors.toList()))) - .apply( - "IntHLL", - ApproximateCountDistinct.<KV<Integer, KV<Integer, Integer>>>globally() - .via((KV<Integer, KV<Integer, Integer>> x) -> (long) x.getValue().hashCode())); - - PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE); - - p.run(); - } - - /** Test a general object, we will make use of a KV as the object as it already has a coder. */ - @Test - @Category(NeedsRunner.class) - public void testObjectTypesPerKey() { - - List<KV<Integer, KV<Integer, Integer>>> ints = new ArrayList<>(); - - for (int i = 0; i < 3; i++) { - for (int k : INTS1) { - ints.add(KV.of(i, KV.of(i, k))); - } - } - - PCollection<KV<Integer, Long>> approxResultInteger = - p.apply("Int", Create.of(ints)) - .apply( - "IntHLL", - ApproximateCountDistinct.<Integer, KV<Integer, Integer>>perKey() - .via(x -> KV.of(x.getKey(), (long) x.hashCode())) - .withPercision(TEST_PRECISION)); - - PAssert.that(approxResultInteger) - .containsInAnyOrder( - ImmutableList.of( - KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE))); - - p.run(); - } - - /** Test a general object, we will make use of a KV as the object as it already has a coder. */ - @Test - @Category(NeedsRunner.class) - public void testGlobalPercision() { - - PCollection<Long> approxResultInteger = - p.apply("Int", Create.of(INTS1)) - .apply("IntHLL", ApproximateCountDistinct.globally().withPercision(TEST_PRECISION)); - - PAssert.thatSingleton(approxResultInteger).isEqualTo(INTS1_ESTIMATE); - - p.run(); - } - - /** Test a general object, we will make use of a KV as the object as it already has a coder. */ - @Test - @Category(NeedsRunner.class) - public void testPerKeyPercision() { - - List<KV<Integer, Integer>> ints = new ArrayList<>(); - - for (int i = 0; i < 3; i++) { - for (int k : INTS1) { - ints.add(KV.of(i, k)); - } - } - - PCollection<KV<Integer, Long>> approxResultInteger = - p.apply("Int", Create.of(ints)) - .apply("IntHLL", ApproximateCountDistinct.perKey().withPercision(TEST_PRECISION)); - - PAssert.that(approxResultInteger) - .containsInAnyOrder( - ImmutableList.of( - KV.of(0, INTS1_ESTIMATE), KV.of(1, INTS1_ESTIMATE), KV.of(2, INTS1_ESTIMATE))); - - p.run(); - } -}
