[
https://issues.apache.org/jira/browse/BEAM-2728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274971#comment-16274971
]
ASF GitHub Bot commented on BEAM-2728:
--------------------------------------
lukecwik commented on a change in pull request #3686: [BEAM-2728] Extension for
sketch-based statistics
URL: https://github.com/apache/beam/pull/3686#discussion_r154448029
##########
File path:
sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/frequency/SketchFrequencies.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sketching.frequency;
+
+import com.clearspring.analytics.stream.frequency.CountMinSketch;
+import com.clearspring.analytics.stream.frequency.FrequencyMergeException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code PTransform}s that records an estimation of the frequency of each
element in a
+ * {@code PCollection}, or the occurrences of values associated with each key
in a
+ * {@code PCollection} of {@code KV}s.
+ *
+ * <p>This class uses the Count-min Sketch structure. The papers and other
useful information
+ * about it is available on this website :
<a>https://sites.google.com/site/countminsketch/</a>
+ * <br>The implementation comes from Apache Spark :
+ * <a>https://github.com/apache/spark/tree/master/common/sketch</a>
+ */
+public class SketchFrequencies {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SketchFrequencies.class);
+
+ // do not instantiate
+ private SketchFrequencies() {
+ }
+
+ /**
+ * A {@code PTransform} that takes an input {@code PCollection<String>} and
returns a
+ * {@code PCollection<CountMinSketch>} whose contents is a Count-min sketch
that allows to query
+ * the number of hits for a specific element in the input {@code
PCollection}.
+ *
+ * <p>The {@code seed} parameters will be used to randomly generate
different hash functions.
+ * Thus, the result can be different for the same stream in different seeds
are used.
+ * The {@code seed} parameter will be used to generate a and b for each hash
function.
+ * <br>The Count-min sketch size is constant through the process so the
memory use is fixed.
+ * However, the dimensions are directly linked to the accuracy.
+ * <br>By default, the relative error is set to 1% with 1% probability that
the estimation
+ * breaks this limit.
+ * <br>Also keep in mind that this algorithm works well on highly skewed
data but gives poor
+ * results if the elements are evenly distributed.
+ *
+ * <p>See {@link CountMinSketchFn#withAccuracy(double, double)} in order to
tune the parameters.
+ * <br>Also see {@link CountMinSketchFn} for more details about the
algorithm's principle.
+ *
+ * <p>Example of use:
+ * <pre>{@code
+ * PCollection<String> pc = ...;
+ * PCollection<CountMinSketch> countMinSketch =
+ * pc.apply(SketchFrequencies.<String>globally(1234));
+ * }</pre>
+ *
+ * <p>Also see {@link CountMinSketchFn} for more details about the
algorithm's principle.
+ *
+ * @param seed the seed used for generating randomly different hash
functions
+ */
+ public static Combine.Globally<String, CountMinSketch> globally(int seed) {
+ return Combine.<String, CountMinSketch>globally(CountMinSketchFn
+ .create(seed).withAccuracy(0.001, 0.99));
+ }
+
+ /**
+ * A {@code PTransform} that takes an input {@code PCollection<KV<K,
InputT>>} and
+ * returns a {@code PCollection<KV<K, CountMinSketch>>} that contains an
output element mapping
+ * each distinct key in the input {@code PCollection} to a structure that
allows to query the
+ * count of a specific element associated with that key in the input {@code
PCollection}.
+ *
+ * <p>The {@code seed} parameters will be used to randomly generate
different hash functions.
+ * Thus, the result can be different for the same stream in different seeds
are used.
+ * The {@code seed} parameter will be used to generate a and b for each hash
function.
+ * <br>The Count-min sketch size is constant through the process so the
memory use is fixed.
+ * However, the dimensions are directly linked to the accuracy.
+ * <br>By default, the relative error is set to 1% with 1% probability that
the estimation
+ * breaks this limit.
+ * <br>also keep in mind that this algorithm works well on highly skewed
data but gives poor
+ * results if the elements are evenly distributed.
+ *
+ * <p>See {@link CountMinSketchFn#withAccuracy(double, double)} in order to
tune the parameters.
+ * <br>Also see {@link CountMinSketchFn} for more details about the
algorithm's principle.
+ *
+ * <p>Example of use:
+ * <pre>{@code
+ * PCollection<KV<Integer, String>> pc = ...;
+ * PCollection<KV<Integer, CountMinSketch>> countMinSketch =
+ * pc.apply(SketchFrequencies.<Integer, String>perKey(1234));
+ * }</pre>
+ *
+ * @param seed the seed used for generating different hash functions
+ * @param <K> the type of the keys in the input and output {@code
PCollection}s
+ */
+ public static <K> Combine.PerKey<K, String, CountMinSketch> perKey(int seed)
{
+ return Combine.<K, String, CountMinSketch>perKey(CountMinSketchFn
+ .create(seed).withAccuracy(0.001, 0.99));
+ }
+
+ /**
+ * A {@code Combine.CombineFn} that computes the {@link CountMinSketch}
Structure
+ * of an {@code Iterable} of Strings, useful as an argument to {@link
Combine#globally} or
+ * {@link Combine#perKey}.
+ *
+ * <p>When an element is added to the Count-min sketch, it is mapped to one
column in each
+ * row using different hash functions, and a counter is updated in each
column.
+ * <br>Collisions will happen as far as the number of distinct elements in
the stream is greater
+ * than the width of the sketch. Each counter might be associated to many
items so the frequency
+ * of an element is always overestimated. On average the relative error on a
counter is bounded,
+ * but some counters can be very inaccurate.
+ * <br>That's why different hash functions are used to map the same element
to different
+ * counters. Thus, the overestimation for each counter will differ as there
will be different
+ * collisions, and one will probably be less inaccurate than the average.
+ *
+ * <p>Both the average relative error and the probability to have an
estimation overcoming this
+ * error can be computed by knowing the dimensions of the sketch, and
vice-versa.
+ * Thus, for Count-min sketch with 10 000 columns and 7 rows, the relative
error should not be no
+ * more than 0.02% in 99% of the cases.
+ *
+ */
+ public static class CountMinSketchFn
+ extends Combine.CombineFn<String, CountMinSketch, CountMinSketch> {
+
+ private final int depth;
+
+ private final int width;
+
+ private final int seed;
+
+ private CountMinSketchFn(double eps, double confidence, int seed) {
+ this.width = (int) Math.ceil(2 / eps);
+ this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2));
+ this.seed = seed;
+ }
+
+ private CountMinSketchFn(int width, int depth, int seed) {
+ this.width = width;
+ this.depth = depth;
+ this.seed = seed;
+ }
+
+ /**
+ * Returns an {@code CountMinSketchFn} combiner that will have a Count-min
sketch
+ * which will estimate the frequencies with about 1% of error guaranteed
at 99%.
+ * the resulting dimensions are 2000 x 7. It will stay constant during all
the aggregation.
+ *
+ * <p>the {@code seed} parameters is used to generate different hash
functions of the form :
+ * <pre>a * i + b % p % width ,</pre>
+ * where a, b are chosen randomly and p is a prime number larger than the
maximum i value.
+ *
+ * <p>Example of use:
+ * <br>1) Globally :
+ * <pre>{@code
+ * PCollection<String> pc = ...;
+ * PCollection<CountMinSketch> countMinSketch =
+ * pc.apply(Combine.globally(CountMinSketchFn.<String>create(1234));
+ * }</pre>
+ * <br>2) Per key :
+ * <pre>{@code
+ * PCollection<KV<Integer, String>> pc = ...;
+ * PCollection<KV<Integer, CountMinSketch>> countMinSketch =
+ * pc.apply(Combine.perKey(CountMinSketchFn.<String>create(1234));
+ * }</pre>
+ *
+ * @param seed the seed used for generating different hash functions
+ */
+ public static CountMinSketchFn create(int seed) {
+ return new CountMinSketchFn(0.001, 0.99, seed);
+ }
+
+ /**
+ * Returns an {@code CountMinSketchFn} combiner that will have a Count-min
sketch of
+ * dimensions {@code width x depth}, that will stay constant during all
the aggregation.
+ * This method can only be applied from a {@link CountMinSketchFn} already
created with the
+ * method {@link CountMinSketchFn#create(int)}.
+ *
+ * <p>The greater the {@code width}, the lower the expected relative error
{@code epsilon} :
+ * <pre>{@code epsilon = 2 / width}</pre>
+ *
+ * <p>The greater the {@code depth}, the lower the probability to actually
have
+ * a greater relative error than expected.
+ * <pre>{@code confidence = 1 - 2^-depth}</pre>
+ *
+ * <p>Example of use:
+ * <br>1) Globally :
+ * <pre> {@code
+ * PCollection<String> pc = ...;
+ * PCollection<CountMinSketch> countMinSketch =
+ * pc.apply(Combine.globally(CountMinSketchFn.<String>create(1234)
+ * withDimensions(10000, 7));
+ * } </pre>
+ * <br>2) Per key :
+ * <pre> {@code
+ * PCollection<KV<Integer, String>> pc = ...;
+ * PCollection<KV<Integer, CountMinSketch>> countMinSketch =
+ * pc.apply(Combine.perKey(CountMinSketchFn.<String>create(1234)
+ * withDimensions(10000, 7)););
+ * } </pre>
+ *
+ * @param width Number of columns, i.e. number of counters for the stream.
+ * @param depth Number of lines, i.e. number of hash functions
+ */
+ public CountMinSketchFn withDimensions(int width, int depth) {
+ if (width <= 0 || depth <= 0) {
+ throw new IllegalArgumentException("depth and width must be
positive.");
+ }
+ return new CountMinSketchFn(width, depth, this.seed);
+ }
+
+ /**
+ * Returns an {@code CountMinSketchFn} combiner that will be as accurate
as specified. The
+ * relative error {@code epsilon} can be guaranteed only with a certain
{@code confidence},
+ * which has to be between 0 and 1 (1 being of course impossible). Those
parameters will
+ * determine the size of the Count-min sketch in which the elements will
be aggregated.
+ * This method can only be applied to a {@link CountMinSketchFn} already
created with the
+ * method {@link CountMinSketchFn#create(int)}.
+ *
+ * <p>The lower the {@code epsilon} value, the greater the width.
+ * <pre>{@code width = (int) 2 / epsilon)}</pre>
+ *
+ * <p>The greater the confidence, the greater the depth.
+ * <pre>{@code depth = (int) -log2(1 - confidence)}</pre>
+ *
+ * <p>Example of use:
+ * <br>1) Globally :
+ * <pre>{@code
+ * PCollection<String> pc = ...;
+ * PCollection<CountMinSketch> countMinSketch =
+ * pc.apply(Combine.globally(CountMinSketchFn.<String>create(1234)
+ * withDimensions(0.001, 0.99));
+ * }</pre>
+ * <br>2) Per key :
+ * <pre>{@code
+ * PCollection<KV<Integer, String>> pc = ...;
+ * PCollection<KV<Integer, CountMinSketch>> countMinSketch =
+ * pc.apply(Combine.perKey(CountMinSketchFn.<String>create(1234)
+ * withAccuracy(0.001, 0.99)););
+ * }</pre>
+ *
+ *
+ * @param epsilon the relative error of the result
+ * @param confidence the confidence in the result to not overcome the
relative error
+ */
+ public CountMinSketchFn withAccuracy(double epsilon, double confidence) {
+ return new CountMinSketchFn(epsilon, confidence, this.seed);
+ }
+
+ @Override public CountMinSketch createAccumulator() {
+ return new CountMinSketch(this.depth, this.width, this.seed);
+ }
+
+ @Override public CountMinSketch addInput(CountMinSketch accumulator,
String element) {
+ accumulator.add(element, 1);
+ return accumulator;
+ }
+
+ @Override public CountMinSketch mergeAccumulators(Iterable<CountMinSketch>
accumulators) {
+ Iterator<CountMinSketch> it = accumulators.iterator();
+ if (!it.hasNext()) {
+ return new CountMinSketch(seed, width, depth);
+ }
+ CountMinSketch merged = it.next();
+ try {
+ while (it.hasNext()) {
+ merged = CountMinSketch.merge(merged, it.next());
+ }
+ } catch (FrequencyMergeException e) {
+ // Should never happen because all the accumulators created are of the
same type.
+ LOG.error(e.getMessage(), e);
+ }
+ return merged;
+ }
+
+ @Override public CountMinSketch extractOutput(CountMinSketch accumulator) {
+ return accumulator;
+ }
+
+ @Override public Coder<CountMinSketch> getAccumulatorCoder(CoderRegistry
registry,
+ Coder inputCoder) {
+ return new CountMinSketchCoder();
+ }
+
+ @Override public Coder<CountMinSketch> getDefaultOutputCoder(CoderRegistry
registry,
+ Coder inputCoder) throws CannotProvideCoderException {
+ return new CountMinSketchCoder();
+ }
+
+ @Override public CountMinSketch defaultValue() {
+ return new CountMinSketch(1, 1, 1);
+ }
+ }
+
+ static class CountMinSketchCoder extends CustomCoder<CountMinSketch> {
+
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+
+ @Override public void encode(CountMinSketch value, OutputStream outStream)
throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Count-min Sketch");
+ }
+ BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value), outStream);
+ }
+
+ @Override public CountMinSketch decode(InputStream inStream) throws
IOException {
+ return CountMinSketch.deserialize(BYTE_ARRAY_CODER.decode(inStream));
+ }
+
+ @Override public boolean consistentWithEquals() {
+ return false;
+ }
+
+ @Override public boolean isRegisterByteSizeObserverCheap(CountMinSketch
value) {
+ return true;
+ }
+
+ @Override protected long getEncodedElementByteSize(CountMinSketch value)
throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Count-min Sketch");
+ } else {
+ // depth and width as computed in the CountMinSketch constructor from
the relative error and
+ // confidence.
+ int width = (int) Math.ceil(2 / value.getRelativeError());
Review comment:
I think @jkff is pointing out that it would be best if we didn't duplicate
the internal details of StreamLib and instead asked them to expose this
functionality.
I wouldn't block merging this in and instead add a TODO to replace it by
calling StreamLib's implementation directly when available and link to a JIRA
or PR against StreamLib which exposes it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Extension for sketch-based statistics
> -------------------------------------
>
> Key: BEAM-2728
> URL: https://issues.apache.org/jira/browse/BEAM-2728
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-extensions
> Reporter: Arnaud Fournier
> Assignee: Arnaud Fournier
> Priority: Minor
>
> Goal : Provide an extension library to compute approximate statistics on
> streams.
> Interest : Probabilistic data structures can create an approximation (sketch)
> of the current state of a stream without storing every element but rather
> processing each observation quickly to summarize its current state and find
> useful statistical insights.
> Implementation is here :
> https://github.com/ArnaudFnr/beam/tree/sketching/sdks/java/extensions/sketching
> More info :
> https://docs.google.com/document/d/1Xy6g5RPBYX_HadpIr_2WrUeusiwL0Jo2ACI5PEOP1kc/edit
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)