[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312673#comment-15312673 ]
ASF GitHub Bot commented on APEXMALHAR-2094: -------------------------------------------- Github user sandeep-n commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/301#discussion_r65579101 --- Diff: sketches/src/main/java/org/apache/apex/malhar/sketches/QuantilesEstimator.java --- @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import com.yahoo.sketches.quantiles.QuantilesSketch; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * An implementation of BaseOperator that computes a "sketch" (a representation of the probability distribution using + * a low memory footprint) of the incoming numeric data, and evaluates/outputs the cumulative distribution function and + * quantiles of the probability distribution. Leverages the quantiles sketch implementation from the Yahoo Datasketches + * Library. + * <p/> + * <b>Input Port(s) : </b><br> + * <b>data : </b> Data values input port. <br> + * <br> + * <b>Output Port(s) : </b> <br> + * <b>cdfOutput : </b>cumulative distribution function output port. <br> + * <b>quantilesOutput : </b>quantiles output port. <br> + * <br> + * <b>Partitions : No</b>, no will yield wrong results. <br> + * <br>+ + */ +@OperatorAnnotation(partitionable = false) +public class QuantilesEstimator extends BaseOperator +{ + /** + * Output port that emits cdf estimated at the current data point + */ + public final transient DefaultOutputPort<Double> cdfOutput = new DefaultOutputPort<>(); + /** + * Emits quantiles of stream seen thus far + */ + public final transient DefaultOutputPort<double[]> quantilesOutput = new DefaultOutputPort<>(); + /** + * Emits probability masses on specified intervals + */ + public final transient DefaultOutputPort<double[]> pmfOutput = new DefaultOutputPort<>(); + private transient QuantilesSketch quantilesSketch = QuantilesSketch.builder().build(); + /** + * This field determines the specific quantiles to be calculated. + * Default is set to compute the standard quartiles. + */ + private double[] fractions = {0.0, 0.25, 0.50, 0.75, 1.00}; + /** + * This field determines the intervals on which the probability mass function is computed. + */ + private double[] pmfIntervals = {}; + /** + * This operator computes three different quantities which are output on separate output ports. If not using any of + * these quantities, these variables can be set to avoid unnecessary computation. + */ + private boolean computeCdf = true; + private boolean computeQuantiles = true; + private boolean computePmf = true; + public final transient DefaultInputPort<Double> data = new DefaultInputPort<Double>() + { + @Override + public void process(Double input) + { + + quantilesSketch.update(input); + + if (computeQuantiles) { + /** + * Computes and emits quantiles of the stream seen thus far + */ + quantilesOutput.emit(quantilesSketch.getQuantiles(fractions)); + } + + if (computeCdf) { + /** + * Emits (estimate of the) cumulative distribution function evaluated at the input value, according to the + * sketched probability distribution of the stream seen thus far. + */ + cdfOutput.emit(quantilesSketch.getCDF(new double[]{input})[0]); --- End diff -- This looks ugly, but the getCDF method belongs to the QuantileSketch class from the external DataSketches library, so I'd rather stick with this signature. > Quantiles sketch operator > ------------------------- > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: Sandeep Narayanaswami > Assignee: Sandeep Narayanaswami > Priority: Minor > > An operator that "sketches" in an online fashion the probability distribution > of an input (numeric) data stream, enabling computation of quantiles and > cumulative distribution functions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)