[ 
https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14730561#comment-14730561
 ] 

ASF GitHub Bot commented on FLINK-2030:
---------------------------------------

Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/861#discussion_r38735611
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/ContinuousHistogram.java
 ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.accumulators;
    +
    +import java.util.AbstractMap;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import static java.lang.Double.MAX_VALUE;
    +
    +/**
    + * A Histogram accumulator designed for Continuous valued data.
    + * It supports:
    + * -- {@link #quantile(double)}
    + *                 Computes a quantile of the data
    + * -- {@link #count(double)}
    + *         Computes number of items less than the given value in the data
    + * <p>
    + * A continuous histogram stores values in bins in sorted order and keeps 
their associated
    + * number of items. It is assumed that the items associated with every bin 
are scattered around
    + * it, half to the right and half to the left.
    + * <p>
    + * bin counters:  m_1    m_2    m_3    m_4    m_5    m_6
    + *                10     12     5      10     4      6
    + *                |  5   |  6   |  2.5 |  5   |  2   |
    + *             5  |  +   |  +   |   +  |  +   |  +   |  3
    + *                |  6   |  2.5 |   5  |  2   |  3   |
    + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    + * bin index:     1      2      3      4      5      6
    + * bin values:    v_1 <  v_2 <  v_3 <  v_4 <  v_5 <  v_6
    + * <p>
    + * The number of items between v_i and v_(i+1) is directly proportional to 
the area of
    + * trapezoid (v_i, v_(i+1), m_(i+1), m_i)
    + * <p>
    + * Adapted from Ben-Haim and Yom-Tov's
    + * <a href = 
"http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf";>Streaming 
Decision Tree Algorithm's histogram</a>
    + */
    +public class ContinuousHistogram implements Accumulator<Double, 
TreeMap<Double, Integer>> {
    +
    +   protected TreeMap<Double, Integer> treeMap = new TreeMap<Double, 
Integer>();
    +
    +   protected long counter = 0;
    +
    +   private int bin;
    +
    +   private double lower;
    +
    +   private double upper;
    +
    +   private PriorityQueue<KeyDiff> diffQueue;
    +
    +   private HashMap<Double, KeyProps> keyUpdateTimes;
    +
    +   private long timestamp;
    +
    +   /**
    +    * Creates a new Continuous histogram with the given number of bins
    +    * Bins represents the number of values the histogram stores to 
approximate the continuous
    +    * data set. The higher this value, the more we move towards an exact 
representation of data.
    +    *
    +    * @param numBins Number of bins in the histogram
    +    */
    +   public ContinuousHistogram(int numBins) {
    +           if (numBins <= 0) {
    +                   throw new IllegalArgumentException("Number of bins must 
be greater than zero");
    +           }
    +           bin = numBins;
    +           lower = MAX_VALUE;
    +           upper = -MAX_VALUE;
    +           diffQueue = new PriorityQueue<>();
    +           keyUpdateTimes = new HashMap<>();
    +           timestamp = 0;
    +   }
    +
    +   /**
    +    * Consider using {@link #add(double)} for primitive double values to 
get better performance.
    +    */
    +   @Override
    +   public void add(Double value) {
    +           add(value, 1);
    +   }
    +
    +   public void add(double value) {
    +           add(value, 1);
    +   }
    +
    +   @Override
    +   public TreeMap<Double, Integer> getLocalValue() {
    +           return this.treeMap;
    +   }
    +
    +   /**
    +    * Get the total number of items added to this histogram.
    +    * This is preserved across merge operations.
    +    *
    +    * @return Total number of items added to the histogram
    +    */
    +   public long getTotal() {
    +           return counter;
    +   }
    +
    +   /**
    +    * Get the current size of the {@link #treeMap}
    +    *
    +    * @return Size of the {@link #treeMap}
    +    */
    +   public int getSize() {
    +           return treeMap.size();
    +   }
    +
    +   @Override
    +   public void resetLocal() {
    +           treeMap.clear();
    +           counter = 0;
    +           lower = MAX_VALUE;
    +           upper = -MAX_VALUE;
    +           diffQueue.clear();
    +           keyUpdateTimes.clear();
    +   }
    +
    +   @Override
    +   public void merge(Accumulator<Double, TreeMap<Double, Integer>> other) {
    +           fill(other.getLocalValue().entrySet());
    +   }
    +
    +   /**
    +    * Merges the given other histogram into this histogram, with the 
number of bins in the
    +    * merged histogram being {@code numBins}.
    +    *
    +    * @param other   Histogram to be merged
    +    * @param numBins Bins in the merged histogram
    +    */
    +   public void merge(Accumulator<Double, TreeMap<Double, Integer>> other, 
int numBins) {
    +           bin = numBins;
    +           merge(other);
    +   }
    +
    +   @Override
    +   public Accumulator<Double, TreeMap<Double, Integer>> clone() {
    +           ContinuousHistogram result = new ContinuousHistogram(bin);
    +           result.treeMap = new TreeMap<>(treeMap);
    +           result.counter = counter;
    +           result.lower = lower;
    +           result.upper = upper;
    +           // initialize all differences and key update times for the new 
histogram
    +           result.computeDiffs();
    +           return result;
    +   }
    +
    +   void add(double value, int count) {
    +           addValue(value, count);
    +           if (getSize() > bin) {
    +                   mergeBins();
    +           }
    +   }
    +
    +   void fill(Set<Map.Entry<Double, Integer>> entries) {
    --- End diff --
    
    Same as above


> Implement an online histogram with Merging and equalization features
> --------------------------------------------------------------------
>
>                 Key: FLINK-2030
>                 URL: https://issues.apache.org/jira/browse/FLINK-2030
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Machine Learning Library
>            Reporter: Sachin Goel
>            Assignee: Sachin Goel
>            Priority: Minor
>              Labels: ML
>
> For the implementation of the decision tree in 
> https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an 
> histogram with online updates, merging and equalization features. A reference 
> implementation is provided in [1]
> [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to