[
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14499428#comment-14499428
]
ASF GitHub Bot commented on FLINK-1297:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/605#discussion_r28575964
--- Diff:
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that
has only been processed by one pass
+ * This statistical information is meant to help determine the
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+ OperatorStatisticsConfig config;
+
+ Object min;
+ Object max;
+ ICardinality countDistinct;
+ IHeavyHitter heavyHitter;
+ long cardinality = 0;
+
+ public OperatorStatistics(OperatorStatisticsConfig config) {
+ this.config = config;
+ if
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
{
+ countDistinct = new
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+ }
+
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+ countDistinct = new
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+ }
+ if
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+ heavyHitter =
+ new
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+ }
+ if
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+ heavyHitter =
+ new
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+ }
+ }
+
+ public void process(Object tupleObject){
+ if (tupleObject instanceof Comparable) {
+ if (config.collectMin && (min == null || ((Comparable)
tupleObject).compareTo(min) < 0)) {
+ min = tupleObject;
+ }
+ if (config.collectMax && (max == null || ((Comparable)
tupleObject).compareTo(max) > 0)) {
+ max = tupleObject;
+ }
+ }
+ if (config.collectCountDistinct){
+ countDistinct.offer(tupleObject);
+ }
+ if (config.collectHeavyHitters){
+ heavyHitter.addObject(tupleObject);
+ }
+ cardinality+=1;
+ }
+
+ public void merge(OperatorStatistics other){
+
+ if (this.config.collectMin &&
((Comparable)this.min).compareTo(other.min) > 0 ) {
+ this.min = other.min;
+ }
+ if (this.config.collectMax &&
((Comparable)this.max).compareTo(other.max) < 0 ) {
+ this.max = other.max;
+ }
+
+ try {
+ this.heavyHitter.merge(other.heavyHitter);
+ } catch (HeavyHitterMergeException e) {
+ e.printStackTrace();
--- End diff --
rethrow
> Add support for tracking statistics of intermediate results
> -----------------------------------------------------------
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Reporter: Alexander Alexandrov
> Assignee: Alexander Alexandrov
> Fix For: 0.9
>
> Original Estimate: 1,008h
> Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the
> runtime code with a statistics facility that collects the required
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic
> statistics for the (intermediate) result of dataflows (e.g. min, max, count,
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have
> some sort of detailed sketch about the key distribution of an intermediate
> result. I am not sure whether a simple histogram is the most effective way to
> go. Maybe somebody would propose another lightweight sketch that provides
> better accuracy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)