[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341617#comment-15341617
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/301#discussion_r67852390
  
    --- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
    @@ -0,0 +1,204 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.sketches;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +
    +import javax.validation.ConstraintViolationException;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.io.ConsoleOutputOperator;
    +import com.datatorrent.lib.testbench.CollectorTestSink;
    +import com.datatorrent.lib.util.TestUtils;
    +
    +public class QuantilesEstimatorTest
    +{
    +  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
    +
    +  public static class NumberSource extends BaseOperator implements 
InputOperator
    +  {
    +    public final DefaultOutputPort<Double> output = new 
DefaultOutputPort<>();
    +
    +    private Random rand = new Random(1234L);
    +
    +    public NumberSource() {}
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +      output.emit(rand.nextGaussian());
    +    }
    +  }
    +
    +  public static class PmfSink extends BaseOperator
    +  {
    +    public final DefaultInputPort<double[]> input = new 
DefaultInputPort<double[]>()
    +    {
    +      @Override
    +      public void process(double[] tuple) {}
    +    };
    +
    +    public PmfSink() {}
    +  }
    +
    +  public static class QuantileSink extends BaseOperator
    +  {
    +    public final DefaultInputPort<double[]> input = new 
DefaultInputPort<double[]>()
    +    {
    +      @Override
    +      public void process(double[] tuple) {}
    +    };
    +
    +    public QuantileSink() {}
    +  }
    +
    +  public static class CdfSink extends BaseOperator
    +  {
    +    public final DefaultInputPort<Double> input = new 
DefaultInputPort<Double>()
    +    {
    +      @Override
    +      public void process(Double tuple) {}
    +    };
    +
    +    public CdfSink() {}
    +  }
    +
    +  @Test
    +  public void testQuantiles()
    +  {
    +    QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
    +
    +    CollectorTestSink<double[]> sink = new CollectorTestSink<>();
    +    TestUtils.setSink(quantilesOp.quantilesOutput, sink);
    +
    +    Random rand = new Random(1234L);
    +    ArrayList<Double> randArray = new ArrayList<>();
    +
    +    quantilesOp.setup(null);
    +    quantilesOp.beginWindow(0);
    +
    +    for (int i = 0; i < 101; i++) {
    +      double r = rand.nextGaussian();
    +      quantilesOp.data.process(r);
    +      randArray.add(r);
    +    }
    +
    +    quantilesOp.endWindow();
    +
    +    Collections.sort(randArray);
    +
    +    Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
    +    Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
    +  }
    +
    +  @Test
    +  public void testCDF()
    +  {
    +    QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
    +
    +    CollectorTestSink<Double> sink = new CollectorTestSink<>();
    +    TestUtils.setSink(quantilesOp.cdfOutput, sink);
    +
    +    quantilesOp.setup(null);
    +    quantilesOp.beginWindow(0);
    +
    +    for (int i = 0; i < 1001; i++) {
    +      double r = 0.001 * i;
    +      quantilesOp.data.process(r);
    +    }
    +    quantilesOp.endWindow();
    +
    +    List<Double> cdfValues = sink.collectedTuples;
    +    Assert.assertTrue("Highest CDF value is approx 1.0", 
cdfValues.get(cdfValues.size() - 1) >= 0.99 &&
    +        cdfValues.get(cdfValues.size() - 1) <= 1.0);
    +    Assert.assertTrue("Lowest CDF value is approx 0.0", cdfValues.get(0) 
>= 0.0 &&
    +        cdfValues.get(0) <= 0.01);
    +  }
    +
    +  @Test
    +  public void testPMF()
    +  {
    +    QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
    +    double[] intervals = {0.0, 0.20, 0.40, 0.60, 0.80, 1.0};
    +    quantilesOp.setPmfIntervals(intervals);
    +
    +    CollectorTestSink<double[]> sink = new CollectorTestSink<>();
    +    TestUtils.setSink(quantilesOp.pmfOutput, sink);
    +
    +    quantilesOp.setup(null);
    +    quantilesOp.beginWindow(0);
    +    for (int i = 0; i < 1000; i++) {
    +      quantilesOp.data.process(0.001 * i);
    +    }
    +    quantilesOp.endWindow();
    +
    +    double[] finalPmf = 
sink.collectedTuples.get(sink.collectedTuples.size() - 1);
    +    Assert.assertTrue("Probability Mass between 0.0 and 0.2 is approx 
0.2", finalPmf[1] >= 0.19 && finalPmf[1] <= 0.21);
    +  }
    +
    +  public static class Application implements StreamingApplication
    +  {
    +    public void populateDAG(DAG dag, Configuration conf)
    +    {
    +      NumberSource source = dag.addOperator("source", NumberSource.class);
    +      QuantilesEstimator quantilesOp = 
dag.addOperator("quantilesEstimator", QuantilesEstimator.class);
    +      ConsoleOutputOperator cdfConsole = dag.addOperator("cdf output", 
ConsoleOutputOperator.class);
    +      ConsoleOutputOperator quantilesConsole = dag.addOperator("quantiles 
output", ConsoleOutputOperator.class);
    +      cdfConsole.setSilent(true);
    +      quantilesConsole.setSilent(true);
    +
    +      dag.addStream("random number stream", source.output, 
quantilesOp.data);
    +      dag.addStream("cdf", quantilesOp.cdfOutput, cdfConsole.input);
    +      dag.addStream("quantiles", quantilesOp.quantilesOutput, 
quantilesConsole.input);
    +    }
    +  }
    +
    +  @Test
    +  public void testInDAG() throws IOException, Exception
    --- End diff --
    
    Is the purpose of this test just validation?


> Quantiles sketch operator
> -------------------------
>
>                 Key: APEXMALHAR-2094
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Sandeep Narayanaswami
>            Assignee: Sandeep Narayanaswami
>            Priority: Minor
>
> An operator that "sketches" in an online fashion the probability distribution 
> of an input (numeric) data stream, enabling computation of quantiles and 
> cumulative distribution functions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to