This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 02e5c6d  [SYSTEMDS-396] Distinct values count/estimation functions
02e5c6d is described below

commit 02e5c6db0dd5d02416e45874253c59db04151605
Author: Sebastian <[email protected]>
AuthorDate: Tue Jun 2 22:07:34 2020 +0200

    [SYSTEMDS-396] Distinct values count/estimation functions
    
    New function for counting the number of distinct values in a
    MatrixBlock. It is using the builtin AggregateInstructions to parse
    through hop lop. It can be called to execute with different types of
    estimators:
    
    - count : The default implementation that counts by adding to an
    hashmap.
      Not memory efficient, but returns exact counts.
    - KMV : An estimation algorithm K Minimum Values
    - HLL : An estimation algorithm Hyper Log Log (Not finished)
    
    Closes #909.
---
 .github/workflows/functionsTests.yml               |   1 +
 dev/docs/Tasks.txt                                 |   1 +
 .../java/org/apache/sysds/common/Builtins.java     |   2 +
 src/main/java/org/apache/sysds/common/Types.java   |   4 +-
 .../org/apache/sysds/lops/PartialAggregate.java    |  21 +-
 .../sysds/parser/BuiltinFunctionExpression.java    |   5 +-
 .../org/apache/sysds/parser/DMLTranslator.java     |   2 +
 .../sysds/runtime/functionobjects/Builtin.java     |   3 +-
 .../runtime/instructions/CPInstructionParser.java  |   2 +
 .../cp/AggregateUnaryCPInstruction.java            |  24 +-
 .../matrix/data/LibMatrixCountDistinct.java        | 277 +++++++++++++++++++++
 .../matrix/operators/CountDistinctOperator.java    |  64 +++++
 .../apache/sysds/runtime/util/DataConverter.java   |  20 ++
 src/main/java/org/apache/sysds/utils/Hash.java     | 133 ++++++++++
 src/test/java/org/apache/sysds/test/TestUtils.java |  38 ++-
 .../test/component/matrix/CountDistinctTest.java   | 195 +++++++++++++++
 .../apache/sysds/test/component/misc/UtilHash.java | 106 ++++++++
 .../builtin/BuiltinFactorizationTest.java          |   2 +-
 .../functions/countDistinct/CountDistinct.java     |  49 ++++
 .../countDistinct/CountDistinctApprox.java         |  56 +++++
 .../functions/countDistinct/CountDistinctBase.java | 109 ++++++++
 .../functions/countDistinct/countDistinct.dml      |  24 ++
 .../countDistinct/countDistinctApprox.dml          |  24 ++
 23 files changed, 1150 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/functionsTests.yml 
b/.github/workflows/functionsTests.yml
index b983018..fb1a5bb 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -49,6 +49,7 @@ jobs:
           codegen,
           codegenalg.partone,
           codegenalg.parttwo,
+          countDistinct,
           data.misc,
           data.rand,
           data.tensor,
diff --git a/dev/docs/Tasks.txt b/dev/docs/Tasks.txt
index 9a51eb5..f3d4acd 100644
--- a/dev/docs/Tasks.txt
+++ b/dev/docs/Tasks.txt
@@ -310,6 +310,7 @@ SYSTEMDS-390 New Builtin Functions IV
  * 393 Builtin to find Connected Components of a graph                OK
  * 394 Builtin for one-hot encoding of matrix (not frame), see table  OK
  * 395 SVM rework and utils (confusionMatrix, msvmPredict)            OK
+ * 396 Builtin for counting number of distinct values                 OK
 
 SYSTEMDS-400 Spark Backend Improvements
  * 401 Fix output block indexes of rdiag (diagM2V)                    OK
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java 
b/src/main/java/org/apache/sysds/common/Builtins.java
index 7345077..5ee7a79 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -178,6 +178,8 @@ public enum Builtins {
        TRACE("trace", false),
        TO_ONE_HOT("toOneHot", true),
        TYPEOF("typeOf", false),
+       COUNT_DISTINCT("countDistinct",false),
+       COUNT_DISTINCT_APPROX("countDistinctApprox",false),
        VAR("var", false),
        XOR("xor", false),
        WINSORIZE("winsorize", true, false), //TODO parameterize w/ prob, 
min/max val
diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index 2d66e81..996132f 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -175,7 +175,9 @@ public class Types
                PROD, SUM_PROD,
                MIN, MAX,
                TRACE, MEAN, VAR,
-               MAXINDEX, MININDEX;
+               MAXINDEX, MININDEX,
+               COUNT_DISTINCT,
+               COUNT_DISTINCT_APPROX;
                
                @Override
                public String toString() {
diff --git a/src/main/java/org/apache/sysds/lops/PartialAggregate.java 
b/src/main/java/org/apache/sysds/lops/PartialAggregate.java
index 576a5e3..bfec9ff 100644
--- a/src/main/java/org/apache/sysds/lops/PartialAggregate.java
+++ b/src/main/java/org/apache/sysds/lops/PartialAggregate.java
@@ -19,15 +19,14 @@
 
 package org.apache.sysds.lops;
 
-import org.apache.sysds.hops.HopsException;
-import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
- 
-import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.common.Types.AggOp;
 import org.apache.sysds.common.Types.CorrectionLocationType;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.Direction;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysds.hops.HopsException;
+import org.apache.sysds.lops.LopProperties.ExecType;
 
 
 /**
@@ -77,7 +76,7 @@ public class PartialAggregate extends Lop
                input.addOutput(this);
                lps.setProperties(inputs, et);
        }
-       
+
        /**
         * This method computes the location of "correction" terms in the output
         * produced by PartialAgg instruction.
@@ -340,6 +339,18 @@ public class PartialAggregate extends Lop
                                        return "uaktrace";
                                break;
                        }
+
+                       case COUNT_DISTINCT: {
+                               if(dir == Direction.RowCol )
+                                       return "uacd";
+                               break;
+                       }
+                       
+                       case COUNT_DISTINCT_APPROX: {
+                               if(dir == Direction.RowCol )
+                                       return "uacdap";
+                               break;
+                       }
                }
                
                //should never come here for normal compilation
diff --git 
a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java 
b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index 2c5d61a..96e2ebc 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -912,9 +912,10 @@ public class BuiltinFunctionExpression extends 
DataIdentifier
                case NROW:
                case NCOL:
                case LENGTH:
+               case COUNT_DISTINCT:
+               case COUNT_DISTINCT_APPROX:
                        checkNumParameters(1);
-                       checkDataTypeParam(getFirstExpr(),
-                               DataType.MATRIX, DataType.FRAME, DataType.LIST);
+                       checkDataTypeParam(getFirstExpr(), DataType.MATRIX);
                        output.setDataType(DataType.SCALAR);
                        output.setDimensions(0, 0);
                        output.setBlocksize(0);
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index de1e3ce..b8c7bcf 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -2313,6 +2313,8 @@ public class DMLTranslator
                case SUM:
                case PROD:
                case VAR:
+               case COUNT_DISTINCT:
+               case COUNT_DISTINCT_APPROX:
                        currBuiltinOp = new AggUnaryOp(target.getName(), 
DataType.SCALAR, target.getValueType(),
                                AggOp.valueOf(source.getOpCode().name()), 
Direction.RowCol, expr);
                        break;
diff --git 
a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java 
b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
index 8cac05c..7e5b73a 100644
--- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
+++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
@@ -50,7 +50,8 @@ public class Builtin extends ValueFunction
        public enum BuiltinCode { SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, 
ATAN, LOG, LOG_NZ, MIN,
                MAX, ABS, SIGN, SQRT, EXP, PLOGP, PRINT, PRINTF, NROW, NCOL, 
LENGTH, LINEAGE, ROUND, MAXINDEX, MININDEX,
                STOP, CEIL, FLOOR, CUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, 
INVERSE, SPROP, SIGMOID, EVAL, LIST,
-               TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID }
+               TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID, 
COUNT_DISTINCT, COUNT_DISTINCT_APPROX}
+               
        public BuiltinCode bFunc;
        
        private static final boolean FASTMATH = true;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
index c613222..d4a78ef 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
@@ -107,6 +107,8 @@ public class CPInstructionParser extends InstructionParser
                String2CPInstructionType.put( "length"  ,CPType.AggregateUnary);
                String2CPInstructionType.put( "exists"  ,CPType.AggregateUnary);
                String2CPInstructionType.put( "lineage" ,CPType.AggregateUnary);
+               String2CPInstructionType.put( "uacd"    , 
CPType.AggregateUnary);
+               String2CPInstructionType.put( "uacdap"  , 
CPType.AggregateUnary);
 
                String2CPInstructionType.put( "uaggouterchain", 
CPType.UaggOuterChain);
                
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
index 5f053e9..100251d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
@@ -31,9 +31,11 @@ import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageItemUtils;
+import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.SimpleOperator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -42,7 +44,8 @@ import org.apache.sysds.utils.Explain;
 public class AggregateUnaryCPInstruction extends UnaryCPInstruction
 {
        public enum AUType {
-               NROW, NCOL, LENGTH, EXISTS, LINEAGE,
+               NROW, NCOL, LENGTH, EXISTS, LINEAGE, 
+               COUNT_DISTINCT, COUNT_DISTINCT_APPROX,
                DEFAULT;
                public boolean isMeta() {
                        return this != DEFAULT;
@@ -72,6 +75,14 @@ public class AggregateUnaryCPInstruction extends 
UnaryCPInstruction
                        || opcode.equalsIgnoreCase("lineage")){
                        return new AggregateUnaryCPInstruction(new 
SimpleOperator(Builtin.getBuiltinFnObject(opcode)),
                                in1, out, AUType.valueOf(opcode.toUpperCase()), 
opcode, str);
+               } 
+               else if(opcode.equalsIgnoreCase("uacd")){
+                       return new AggregateUnaryCPInstruction(new 
SimpleOperator(null),
+                       in1, out, AUType.COUNT_DISTINCT, opcode, str);
+               }
+               else if(opcode.equalsIgnoreCase("uacdap")){
+                       return new AggregateUnaryCPInstruction(new 
SimpleOperator(null),
+                       in1, out, AUType.COUNT_DISTINCT_APPROX, opcode, str);
                }
                else { //DEFAULT BEHAVIOR
                        AggregateUnaryOperator aggun = InstructionUtils
@@ -152,6 +163,17 @@ public class AggregateUnaryCPInstruction extends 
UnaryCPInstruction
                                ec.setScalarOutput(output_name, new 
StringObject(Explain.explain(li)));
                                break;
                        }
+                       case COUNT_DISTINCT:
+                       case COUNT_DISTINCT_APPROX: {
+                               if( 
!ec.getVariables().keySet().contains(input1.getName()) )
+                                       throw new DMLRuntimeException("Variable 
'" + input1.getName() + "' does not exist.");
+                               MatrixBlock input = 
ec.getMatrixInput(input1.getName());
+                               CountDistinctOperator op = new 
CountDistinctOperator(_type);
+                               int res = 
LibMatrixCountDistinct.estimateDistinctValues(input, op);
+                               ec.releaseMatrixInput(input1.getName());
+                               ec.setScalarOutput(output_name, new 
IntObject(res));
+                               break;
+                       }
                        default: {
                                AggregateUnaryOperator au_op = 
(AggregateUnaryOperator) _optr;
                                if (input1.getDataType() == DataType.MATRIX) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixCountDistinct.java
 
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixCountDistinct.java
new file mode 100644
index 0000000..73e89e8
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixCountDistinct.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
+import 
org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains various methods for counting the number of distinct 
values inside a MatrixBlock
+ */
+public class LibMatrixCountDistinct {
+
+       // ------------------------------
+       // Logging parameters:
+       // local debug flag
+       private static final boolean LOCAL_DEBUG = false;
+       // DEBUG/TRACE for details
+       private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+       private static final Log LOG = 
LogFactory.getLog(LibMatrixCountDistinct.class.getName());
+
+       static {
+               // for internal debugging only
+               if(LOCAL_DEBUG) {
+                       
Logger.getLogger("org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct").setLevel(LOCAL_DEBUG_LEVEL);
+               }
+       }
+       // ------------------------------
+
+       /**
+        * The minimum number NonZero of cells in the input before using 
approximate techniques for counting number of
+        * distinct values.
+        */
+       public static int minimumSize = 1024;
+
+       private LibMatrixCountDistinct() {
+               // Prevent instantiation via private constructor.
+       }
+
+       /**
+        * Public method to count the number of distinct values inside a 
matrix. Depending on which CountDistinctOperator
+        * selected it either gets the absolute number or a estimated value.
+        * 
+        * TODO: Support counting num distinct in rows, or columns axis.
+        * 
+        * TODO: Add support for distributed spark operations
+        * 
+        * TODO: If the MatrixBlock type is CompressedMatrix, simply read the 
vaules from the ColGroups.
+        * 
+        * @param in the input matrix to count number distinct values in
+        * @param op the selected operator to use
+        * @return the distinct count
+        */
+       public static int estimateDistinctValues(MatrixBlock in, 
CountDistinctOperator op) {
+               int res = 0;
+               if(op.operatorType == CountDistinctTypes.KMV &&
+                       (op.hashType == HashType.ExpHash || op.hashType == 
HashType.StandardJava)) {
+                       throw new DMLException("Invalid hashing configuration 
using " + op.hashType + " and " + op.operatorType);
+               }
+               else if(op.operatorType == CountDistinctTypes.HLL) {
+                       throw new NotImplementedException("HyperLogLog not 
implemented");
+               }
+               // shortcut in simplest case.
+               if( in.getLength() == 1 || in.isEmpty() )
+                       return 1;
+               else if( in.getNonZeros() < minimumSize ) {
+                       // Just use naive implementation if the number of 
nonZeros values size is small.
+                       res = countDistinctValuesNaive(in);
+               }
+               else {
+                       switch(op.operatorType) {
+                               case COUNT:
+                                       res = countDistinctValuesNaive(in);
+                                       break;
+                               case KMV:
+                                       res = countDistinctValuesKVM(in, op);
+                                       break;
+                               default:
+                                       throw new DMLException("Invalid or not 
implemented Estimator Type");
+                       }
+               }
+               
+               if(res == 0)
+                       throw new DMLRuntimeException("Imposible estimate of 
distinct values");
+               return res;
+       }
+
+       /**
+        * Naive implementation of counting Distinct values.
+        * 
+        * Benefit Precise, but uses memory, on the scale of inputs number of 
distinct values.
+        * 
+        * @param in The input matrix to count number distinct values in
+        * @return The absolute distinct count
+        */
+       private static int countDistinctValuesNaive(MatrixBlock in) {
+               Set<Double> distinct = new HashSet<>();
+
+               // TODO performance: direct sparse block /dense block access
+               if(in.isInSparseFormat()) {
+                       Iterator<IJV> it = in.getSparseBlockIterator();
+                       while(it.hasNext()) {
+                               distinct.add(it.next().getV());
+                       }
+                       if( in.getNonZeros() < in.getLength() )
+                               distinct.add(0d);
+               }
+               else {
+                       //TODO fix for large dense blocks, where this call will 
fail
+                       double[] data = in.getDenseBlockValues();
+                       if(data == null) {
+                               throw new DMLRuntimeException("Not valid 
execution");
+                       }
+                       //TODO avoid redundantly adding zero if not entirly 
dense
+                       for(double v : data) {
+                               distinct.add(v);
+                       }
+               }
+               return distinct.size();
+       }
+
+       /**
+        * KMV synopsis(for k minimum values) Distinct-Value Estimation
+        * 
+        * Kevin S. Beyer, Peter J. Haas, Berthold Reinwald, Yannis Sismanis, 
Rainer Gemulla:
+        * 
+        * On synopses for distinct‐value estimation under multiset operations. 
SIGMOD 2007
+        * 
+        * TODO: Add multi-threaded version
+        * 
+        * @param in The Matrix Block to estimate the number of distinct values 
in
+        * @return The distinct count estimate
+        */
+       private static int countDistinctValuesKVM(MatrixBlock in, 
CountDistinctOperator op) {
+
+               // D is the number of possible distinct values in the 
MatrixBlock.
+               // plus 1 to take account of 0 input.
+               long D = in.getNonZeros() + 1;
+
+               /**
+                * To ensure that the likelihood to hash to the same value we 
need O(D^2) positions to hash to assign. If the
+                * value is higher than int (which is the area we hash to) then 
use Integer Max value as largest hashing space.
+                */
+               long tmp = D * D;
+               int M = (tmp > (long) Integer.MAX_VALUE) ? Integer.MAX_VALUE : 
(int) tmp;
+               LOG.debug("M not forced to int size: " + tmp);
+               LOG.debug("M: " + M);
+               /**
+                * The estimator is asymptotically unbiased as k becomes large, 
but memory usage also scales with k. Furthermore
+                * k value must be within range: D >> k >> 0
+                */
+               int k = D > 64 ? 64 : (int) D;
+               SmallestPriorityQueue spq = new SmallestPriorityQueue(k);
+
+               if(in.isInSparseFormat()) {
+                       Iterator<IJV> it = in.getSparseBlockIterator();
+                       while(it.hasNext()) {
+                               double fullValue = it.next().getV();
+                               int hash = Hash.hash(fullValue, op.hashType);
+                               // Since Java does not have unsigned integer, 
the hash value is abs.
+                               int v = (Math.abs(hash)) % (M - 1) + 1;
+                               spq.add(v);
+                       }
+                       if( in.getNonZeros() < in.getLength() )
+                               spq.add(Hash.hash(0d, op.hashType));
+               }
+               else {
+                       //TODO fix for large dense blocks, where this call will 
fail
+                       double[] data = in.getDenseBlockValues();
+                       for(double fullValue : data) {
+                               int hash = Hash.hash(fullValue, op.hashType);
+                               int v = (Math.abs(hash)) % (M - 1) + 1;
+                               spq.add(v);
+                       }
+               }
+
+               LOG.debug("M: " + M);
+               LOG.debug("smallest hash:" + spq.peek());
+               LOG.debug("spq: " + spq.toString());
+
+               if(spq.size() < k) {
+                       return spq.size();
+               }
+               else {
+                       double U_k = (double) spq.poll() / (double) M;
+                       LOG.debug("U_k : " + U_k);
+                       double estimate = (double) (k - 1) / U_k;
+                       LOG.debug("Estimate: " + estimate);
+                       double ceilEstimate = Math.min(estimate, (double) D);
+                       LOG.debug("Ceil worst case: " + ceilEstimate);
+                       return (int) ceilEstimate;
+               }
+       }
+
+       /**
+        * Deceiving name, but is used to contain the k smallest values 
inserted.
+        * 
+        * TODO: add utility method to join two partitions
+        * 
+        * TODO: Replace Standard Java Set and Priority Queue with optimized 
versions.
+        */
+       private static class SmallestPriorityQueue {
+               private Set<Integer> containedSet;
+               private PriorityQueue<Integer> smallestHashes;
+               private int k;
+
+               public SmallestPriorityQueue(int k) {
+                       smallestHashes = new PriorityQueue<>(k, 
Collections.reverseOrder());
+                       containedSet = new HashSet<>(1);
+                       this.k = k;
+               }
+
+               public void add(int v) {
+                       if(!containedSet.contains(v)) {
+                               if(smallestHashes.size() < k) {
+                                       smallestHashes.add(v);
+                                       containedSet.add(v);
+                               }
+                               else if(v < smallestHashes.peek()) {
+                                       LOG.trace(smallestHashes.peek() + " -- 
" + v);
+                                       smallestHashes.add(v);
+                                       containedSet.add(v);
+                                       
containedSet.remove(smallestHashes.poll());
+                               }
+                       }
+               }
+
+               public int size() {
+                       return smallestHashes.size();
+               }
+
+               public int peek() {
+                       return smallestHashes.peek();
+               }
+
+               public int poll() {
+                       return smallestHashes.poll();
+               }
+
+               @Override
+               public String toString() {
+                       return smallestHashes.toString();
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/operators/CountDistinctOperator.java
 
b/src/main/java/org/apache/sysds/runtime/matrix/operators/CountDistinctOperator.java
new file mode 100644
index 0000000..3f63ef9
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/matrix/operators/CountDistinctOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.operators;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import 
org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.utils.Hash.HashType;
+
+public class CountDistinctOperator extends Operator {
+       private static final long serialVersionUID = 7615123453265129670L;
+
+       public final CountDistinctTypes operatorType;
+       public final HashType hashType;
+
+       public enum CountDistinctTypes { // The different supported types of 
counting.
+               COUNT, // Baseline naive implementation, iterate though, add to 
hashMap.
+               KMV, // K-Minimum Values algorithm.
+               HLL // HyperLogLog algorithm.
+       }
+
+       public CountDistinctOperator(AUType opType) {
+               super(true);
+               switch (opType) {
+                       case COUNT_DISTINCT:
+                               this.operatorType = CountDistinctTypes.COUNT;
+                               break;
+                       case COUNT_DISTINCT_APPROX:
+                               this.operatorType = CountDistinctTypes.KMV;
+                               break;
+                       default:
+                               throw new DMLRuntimeException(opType + " not 
supported for CountDistinct Operator");
+               }
+               this.hashType = HashType.LinearHash;
+       }
+
+       public CountDistinctOperator(CountDistinctTypes operatorType) {
+               super(true);
+               this.operatorType = operatorType;
+               this.hashType = HashType.StandardJava;
+       }
+
+       public CountDistinctOperator(CountDistinctTypes operatorType, HashType 
hashType) {
+               super(true);
+               this.operatorType = operatorType;
+               this.hashType = hashType;
+       }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
index dfb369d..3ac19f2 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
@@ -457,6 +457,26 @@ public class DataConverter
        }
 
        /**
+        * Converts an Integer matrix to an MatrixBlock
+        * 
+        * @param data Int matrix input that is converted to double MatrixBlock
+        * @return The matrixBlock constructed.
+        */
+       public static MatrixBlock convertToMatrixBlock(int[][] data){
+               int rows = data.length;
+               int cols = (rows > 0)? data[0].length : 0;
+               MatrixBlock res = new MatrixBlock(rows, cols, false);
+               for(int row = 0; row< data.length; row++){
+                       for(int col = 0; col < cols; col++){
+                               double v = data[row][col];
+                               if( v != 0 )
+                                       res.appendValue(row, col, v);
+                       }
+               }
+               return res;
+       }
+
+       /**
         * Creates a dense Matrix Block and copies the given double vector into 
it.
         * 
         * @param data double array
diff --git a/src/main/java/org/apache/sysds/utils/Hash.java 
b/src/main/java/org/apache/sysds/utils/Hash.java
new file mode 100644
index 0000000..3bc3ca7
--- /dev/null
+++ b/src/main/java/org/apache/sysds/utils/Hash.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.utils;
+
+import org.apache.commons.lang3.NotImplementedException;
+
+/**
+ * A class containing different hashing functions.
+ */
+public class Hash {
+
+       /**
+        * Available Hashing techniques
+        */
+       public enum HashType {
+               StandardJava, LinearHash, ExpHash
+       }
+
+       /**
+        * A random Array (except first value) used for Linear and Exp hashing, 
to integer domain.
+        */
+       private static final int[] a = {0xFFFFFFFF, 0xB7825CBC, 0x10FA23F2, 
0xD54E1532, 0x7590E53C, 0xECE6F631, 0x8954BF60,
+               0x5BE38B88, 0xCA1D3AC0, 0xB2726F8E, 0xBADE7E7A, 0xCACD1184, 
0xFB32BDAD, 0x2936C9D7, 0xB5B88D37, 0xD272D353,
+               0xE139A063, 0xDACF6B87, 0x3568D521, 0x75C619EA, 0x7C2B8CBD, 
0x012C3C7F, 0x0A621C37, 0x77274A12, 0x731D379A,
+               0xE45E0D3B, 0xEAB4AE13, 0x10C440C7, 0x50CF2899, 0xD865BD46, 
0xAABDF34F, 0x218FA0C3,};
+
+       /**
+        * Generic hashing of java objects, not ideal for specific values so 
use the specific methods for specific types.
+        * 
+        * To Use the locality sensitive techniques override the objects 
hashcode function.
+        * 
+        * @param o  The Object to hash.
+        * @param ht The HashType to use.
+        * @return An int Hash value.
+        */
+       public static int hash(Object o, HashType ht) {
+               int hashcode = o.hashCode();
+               switch(ht) {
+                       case StandardJava:
+                               return hashcode;
+                       case LinearHash:
+                               return linearHash(hashcode);
+                       case ExpHash:
+                               return expHash(hashcode);
+                       default:
+                               throw new NotImplementedException("Not 
Implemented hashing combination");
+               }
+       }
+
+       /**
+        * Hash functions for double values.
+        * 
+        * @param o  The double value.
+        * @param ht The hashing function to apply.
+        * @return An int Hash value.
+        */
+       public static int hash(double o, HashType ht) {
+               switch(ht) {
+                       case StandardJava:
+                               // Here just for reference
+                               return new Double(o).hashCode();
+                       case LinearHash:
+                               // Altho Linear Hashing is locality sensitive, 
it is not in this case
+                               // since the bit positions for the double value 
is split in exponent and mantissa.
+                               // If the locality sensitive aspect is required 
use linear hash on an double value rounded to integer.
+                               long v = Double.doubleToLongBits(o);
+                               return linearHash((int) (v ^ (v >>> 32)));
+                       default:
+                               throw new NotImplementedException("Not 
Implemented hashing combination for double value");
+               }
+       }
+
+       /**
+        * Compute the Linear hash of an int input value.
+        * 
+        * @param v The value to hash.
+        * @return The int hash.
+        */
+       public static int linearHash(int v) {
+               return linearHash(v, a.length);
+       }
+
+       /**
+        * Compute the Linear hash of an int input value, but only use the 
first bits of the linear hash.
+        * 
+        * @param v    The value to hash.
+        * @param bits The number of bits to use. up to maximum of 32.
+        * @return The hashed value
+        */
+       public static int linearHash(int v, int bits) {
+               int res = 0;
+               for(int i = 0; i < bits; i++) {
+                       res = (res << 1) + (Long.bitCount(a[i] & v) & 1);
+               }
+               return res;
+       }
+
+       /**
+        * Compute exponentially distributed hash values in range 0..a.length
+        * 
+        * eg: 50% == 0 , 25% == 1 12.5 % == 2 etc.
+        * 
+        * Useful because you can estimate size of a collection by only 
maintaining the highest value found. from this hash.
+        * 
+        * @param x value to hash
+        * @return a hash value byte (only in the range of 0 to a.length)
+        */
+       public static byte expHash(int x) {
+               for(int value = 0; value < a.length; value++) {
+                       int dot = Long.bitCount(a[value] & x) & 1;
+                       if(dot != 0)
+                               return (byte) (value + 1);
+               }
+               return (byte) a.length;
+       }
+}
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java 
b/src/test/java/org/apache/sysds/test/TestUtils.java
index 0596668..8696442 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -779,7 +779,7 @@ public class TestUtils
         * 
         * @param d1 The expected value.
         * @param d2 The actual value.
-        * @return Whether they are equal or not.
+        * @return Whether distance in bits
         */
        public static long compareScalarBits(double d1, double d2) {
                long expectedBits = Double.doubleToLongBits(d1) < 0 ? 
0x8000000000000000L - Double.doubleToLongBits(d1) : Double.doubleToLongBits(d1);
@@ -1386,6 +1386,42 @@ public class TestUtils
        }
 
        /**
+        * 
+        * Generates a test matrix, but only containing real numbers, in the 
range specified.
+        * 
+        * @param rows number of rows
+        * @param cols number of columns
+        * @param min minimum value whole number
+        * @param max maximum value whole number
+        * @param sparsity sparsity
+        * @param seed seed
+        * @return random matrix containing whole numbers in the range 
specified.
+        */
+       public static int[][] generateTestMatrixIntV(int rows, int cols, int 
min, int max, double sparsity, long seed) {
+               int[][] matrix = new int[rows][cols];
+               Random random = (seed == -1) ? TestUtils.random : new 
Random(seed);
+               if (max - min != 0){
+                       for (int i = 0; i < rows; i++) {
+                               for (int j = 0; j < cols; j++) {
+                                       if (random.nextDouble() > sparsity)
+                                               continue;
+                                       matrix[i][j] = (random.nextInt((max - 
min)) + min);
+                               }
+                       }
+               } else{
+                       for (int i = 0; i < rows; i++) {
+                               for (int j = 0; j < cols; j++) {
+                                       if (random.nextDouble() > sparsity)
+                                               continue;
+                                       matrix[i][j] = max;
+                               }
+                       }
+               }
+
+               return matrix;
+       }
+
+       /**
         * <p>
         * Generates a test matrix with the specified parameters as a two
         * dimensional array. The matrix will not contain any zero values.
diff --git 
a/src/test/java/org/apache/sysds/test/component/matrix/CountDistinctTest.java 
b/src/test/java/org/apache/sysds/test/component/matrix/CountDistinctTest.java
new file mode 100644
index 0000000..a8e3e2b
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/matrix/CountDistinctTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.matrix;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
+import 
org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Hash.HashType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class CountDistinctTest {
+
+       private static CountDistinctTypes[] esT = new CountDistinctTypes[] {
+               // The different types of Estimators
+               CountDistinctTypes.COUNT, 
+               CountDistinctTypes.KMV,
+               CountDistinctTypes.HLL
+       };
+
+       @Parameters
+       public static Collection<Object[]> data() {
+               ArrayList<Object[]> tests = new ArrayList<>();
+               ArrayList<MatrixBlock> inputs = new ArrayList<>();
+               ArrayList<Long> actualUnique = new ArrayList<>();
+
+               // single value matrix.
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1, 
1, 0.0, 100.0, 1, 7)));
+               actualUnique.add(1L);
+
+               // single column or row matrix.
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1, 
100, 0.0, 100.0, 1, 7)));
+               actualUnique.add(100L);
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 
1, 0.0, 100.0, 1, 7)));
+               actualUnique.add(100L);
+
+               // Sparse Multicol random values (most likely each value is 
unique)
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 
10, 0.0, 100.0, 0.1, 7)));
+               actualUnique.add(98L); //dense representation
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 
1000, 0.0, 100.0, 0.1, 7)));
+               actualUnique.add(9823L+1); //sparse representation
+
+               // MultiCol Inputs (using integers)
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(5000,
 5000, 1, 100, 1, 8)));
+               actualUnique.add(99L);
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024,
 10240, 1, 100, 1, 7)));
+               actualUnique.add(99L);
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240,
 1024, 1, 100, 1, 7)));
+               actualUnique.add(99L);
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024,
 10241, 1, 1500, 1, 7)));
+               actualUnique.add(1499L);
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024,
 10241, 0, 3000, 1, 7)));
+               actualUnique.add(3000L);
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024,
 10241, 0, 6000, 1, 7)));
+               actualUnique.add(6000L);
+
+               // Sparse Inputs
+               
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024,
 10241, 0, 3000, 0.1, 7)));
+               actualUnique.add(3000L);
+               // 
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240,
 10241, 0, 5000, 0.1, 7)));
+               // actualUnique.add(5000L);
+               // 
inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240,
 10241, 0, 10000, 0.1, 7)));
+               // actualUnique.add(10000L);
+
+               for(CountDistinctTypes et : esT) {
+                       for(HashType ht : HashType.values()) {
+                               if((ht == HashType.ExpHash && et == 
CountDistinctTypes.KMV) ||
+                                       (ht == HashType.StandardJava && et == 
CountDistinctTypes.KMV)) {
+                                       String errorMessage = "Invalid hashing 
configuration using " + ht + " and " + et;
+                                       tests.add(new Object[] {et, 
inputs.get(0), actualUnique.get(0), ht, DMLException.class,
+                                               errorMessage, 0.0});
+                               }
+                               else if(et == CountDistinctTypes.HLL) {
+                                       tests.add(new Object[] {et, 
inputs.get(0), actualUnique.get(0), ht, NotImplementedException.class,
+                                               "HyperLogLog not implemented", 
0.0});
+                               }
+                               else if (et != CountDistinctTypes.COUNT) {
+                                       for(int i = 0; i < inputs.size(); i++) {
+                                               // allowing the estimate to be 
15% off
+                                               tests.add(new Object[] {et, 
inputs.get(i), actualUnique.get(i), ht, null, null, 0.15});
+                                       }
+                               }
+                       }
+                       if (et == CountDistinctTypes.COUNT){
+                               for(int i = 0; i < inputs.size(); i++) {
+                                       tests.add(new Object[] {et, 
inputs.get(i), actualUnique.get(i), null, null, null, 0.0001});
+                               }
+                       }
+               }
+               return tests;
+       }
+
+       @Parameterized.Parameter
+       public CountDistinctTypes et;
+       @Parameterized.Parameter(1)
+       public MatrixBlock in;
+       @Parameterized.Parameter(2)
+       public long nrUnique;
+       @Parameterized.Parameter(3)
+       public HashType ht;
+
+       // Exception handling
+       @Parameterized.Parameter(4)
+       public Class<? extends Exception> expectedException;
+       @Parameterized.Parameter(5)
+       public String expectedExceptionMsg;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       // allowing the estimate to be within 20% of target.
+       @Parameterized.Parameter(6)
+       public double epsilon;
+
+       @Test
+       public void testEstimation() {
+
+               // setup expected exception
+               if(expectedException != null) {
+                       thrown.expect(expectedException);
+                       thrown.expectMessage(expectedExceptionMsg);
+               }
+
+               Integer out = 0;
+               CountDistinctOperator op = new CountDistinctOperator(et, ht);
+               try {
+                       out = LibMatrixCountDistinct.estimateDistinctValues(in, 
op);
+               }
+               catch(DMLException e) {
+                       throw e;
+               }
+               catch(NotImplementedException e) {
+                       throw e;
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       Assert.assertTrue(this.toString(), false);
+               }
+
+               int count = out;
+               boolean success = Math.abs(nrUnique - count) <= nrUnique * 
epsilon;
+               StringBuilder sb = new StringBuilder();
+               sb.append(this.toString());
+               sb.append("\n" + count + " unique values, actual:" + nrUnique + 
" with eps of " + epsilon);
+               Assert.assertTrue(sb.toString(), success);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(et);
+               if(ht != null){
+                       sb.append("-" + ht);
+               }
+               sb.append("  nrUnique:" + nrUnique);
+               sb.append(" & input size:" + in.getNumRows() + "," + 
in.getNumColumns());
+               sb.append(" sparse: " + in.isInSparseFormat());
+               if(expectedException != null) {
+                       sb.append("\nExpected Exception: " + expectedException);
+                       sb.append("\nExpected Exception Msg: " + 
expectedExceptionMsg);
+               }
+               return sb.toString();
+       }
+}
diff --git a/src/test/java/org/apache/sysds/test/component/misc/UtilHash.java 
b/src/test/java/org/apache/sysds/test/component/misc/UtilHash.java
new file mode 100644
index 0000000..0e07d6c
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/misc/UtilHash.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.misc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class UtilHash {
+
+       @Parameters
+       public static Collection<Object[]> data() {
+               ArrayList<Object[]> tests = new ArrayList<>();
+               tests.add(new Object[] {100, 2, 0.0, 1.0});
+               tests.add(new Object[] {100, 5, Double.MIN_VALUE, 
Double.MAX_VALUE});
+               tests.add(new Object[] {1000, 50, Double.MIN_VALUE, 
Double.MAX_VALUE});
+               tests.add(new Object[] {10000, 500, Double.MIN_VALUE, 
Double.MAX_VALUE});
+               tests.add(new Object[] {1000, 500, Double.MIN_VALUE, 
Double.MAX_VALUE});
+               tests.add(new Object[] {1000, 500, 0.0, 1.0});
+               tests.add(new Object[] {1000, 500, 0.0, 100.0});
+               tests.add(new Object[] {1000, 500, 0.0, 0.0000001});
+               tests.add(new Object[] {1000, 1000, 0.0, 0.00000001});
+               tests.add(new Object[] {1000000, 1000000, 0.0, 0.00000001});
+
+               ArrayList<Object[]> actualTests = new ArrayList<>();
+
+               Set<HashType> validHashTypes = new HashSet<>();
+               for(HashType ht : HashType.values()) validHashTypes.add(ht);
+               validHashTypes.remove(HashType.ExpHash);
+
+               for(HashType ht : validHashTypes) {
+                       for(int i = 0; i < tests.size(); i++) {
+                               actualTests.add(new Object[] {tests.get(i)[0], 
tests.get(i)[1], tests.get(i)[2], tests.get(i)[3], ht});
+                       }
+               }
+
+               return actualTests;
+       }
+
+       @Parameterized.Parameter
+       public int nrKeys = 1000;
+       @Parameterized.Parameter(1)
+       public int nrBuckets = 50;
+       @Parameterized.Parameter(2)
+       public double min;
+       @Parameterized.Parameter(3)
+       public double max;
+       @Parameterized.Parameter(4)
+       public HashType ht;
+
+       private double epsilon = 0.05;
+
+       @Test
+       public void chiSquaredTest() {
+               // https://en.wikipedia.org/wiki/Hash_function#Uniformity
+
+               double[] input = TestUtils.generateTestMatrix(1, nrKeys, min, 
max, 1.0, 10)[0];
+
+               int[] buckets = new int[nrBuckets];
+
+               for(double x : input) {
+                       int hv = Hash.hash(new Double(x), ht);
+                       buckets[Math.abs(hv % nrBuckets)] += 1;
+               }
+
+               double top = 0;
+               for(int b : buckets) {
+                       top += (double) (b) * (double) (b + 1.0) / 2.0;
+               }
+
+               double res = top / ((nrKeys / (2.0 * nrBuckets)) * (nrKeys + 
2.0 * nrBuckets - 1));
+
+               boolean success = Math.abs(res - 1) <= epsilon;
+
+               Assert.assertTrue("Chi squared hashing test: " + res + " should 
be close to 1, with hashing: " + ht, success);
+       }
+
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinFactorizationTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinFactorizationTest.java
index 6510b6d..1da15bb 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinFactorizationTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinFactorizationTest.java
@@ -111,7 +111,7 @@ public class BuiltinFactorizationTest extends 
AutomatedTestBase
                        OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = 
rewrites;
                        
                        //generate input and write incl meta data
-                       double[][] Xa = TestUtils.generateTestMatrix(rows, 
cols, 1, 10, sparsity, 7);
+                       double[][] Xa = TestUtils.generateTestMatrix(rows, 
cols, 1.0, 10.0, sparsity, 7);
                        writeInputMatrixWithMTD("X", Xa, true);
                        
                        //run test case
diff --git 
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinct.java
 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinct.java
new file mode 100644
index 0000000..74772e0
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinct.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.countDistinct;
+
+import org.apache.sysds.lops.LopProperties;
+import org.junit.Test;
+
+public class CountDistinct extends CountDistinctBase {
+
+       public String TEST_NAME = "countDistinct";
+       public String TEST_DIR = "functions/countDistinct/";
+       public String TEST_CLASS_DIR = TEST_DIR + 
CountDistinct.class.getSimpleName() + "/";
+
+       protected String getTestClassDir() {
+               return TEST_CLASS_DIR;
+       }
+
+       protected String getTestName() {
+               return TEST_NAME;
+       }
+
+       protected String getTestDir() {
+               return TEST_DIR;
+       }
+
+       @Test
+       public void testSimple1by1() {
+               // test simple 1 by 1.
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               countDistinctTest(1, 1, 1, ex, 0.00001);
+       }
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctApprox.java
 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctApprox.java
new file mode 100644
index 0000000..8d0d242
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctApprox.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.countDistinct;
+
+import org.apache.sysds.lops.LopProperties;
+import org.junit.Test;
+
+public class CountDistinctApprox extends CountDistinctBase {
+
+       private final static String TEST_NAME = "countDistinctApprox";
+       private final static String TEST_DIR = "functions/countDistinct/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
CountDistinctApprox.class.getSimpleName() + "/";
+
+       public CountDistinctApprox(){
+               percentTolerance = 0.1;
+       }
+
+       @Test
+       public void testXXLarge() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = 9000 * percentTolerance;
+               countDistinctTest(9000, 10000, 5000, ex, tolerance);
+       }
+
+       @Override
+       protected String getTestClassDir() {
+               return TEST_CLASS_DIR;
+       }
+
+       @Override
+       protected String getTestName() {
+               return TEST_NAME;
+       }
+
+       @Override
+       protected String getTestDir() {
+               return TEST_DIR;
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctBase.java
 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctBase.java
new file mode 100644
index 0000000..6a9b096
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctBase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.countDistinct;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+public abstract class CountDistinctBase extends AutomatedTestBase {
+
+       protected abstract String getTestClassDir();
+       protected abstract String getTestName();
+       protected abstract String getTestDir();
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(getTestName(), new 
TestConfiguration(getTestClassDir(), getTestName(), new String[] { "A.scalar" 
}));
+       }
+
+       protected double percentTolerance = 0.0;
+       protected double baseTolerance = 0.0001;
+
+       @Test
+       public void testSmall() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = baseTolerance  + 50 *  percentTolerance;
+               countDistinctTest(50, 50, 50, ex,tolerance);
+       }
+
+       @Test
+       public void testLarge() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = baseTolerance + 800 *  percentTolerance;
+               countDistinctTest(800, 1000, 1000, ex,tolerance);
+       }
+
+       @Test
+       public void testXLarge() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = baseTolerance + 1723 *  percentTolerance;
+               countDistinctTest(1723, 5000, 2000, ex,tolerance);
+       }
+
+       @Test
+       public void test1Unique() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = 0.00001;
+               countDistinctTest(1, 100, 1000, ex,tolerance);
+       }
+
+       @Test
+       public void test2Unique() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = 0.00001;
+               countDistinctTest(2, 100, 1000, ex,tolerance);
+       }
+
+       @Test
+       public void test120Unique() {
+               LopProperties.ExecType ex = LopProperties.ExecType.CP;
+               double tolerance = 0.00001 + 120 *  percentTolerance;
+               countDistinctTest(120, 100, 1000, ex,tolerance);
+       }
+
+       public void countDistinctTest(int numberDistinct, int cols, int rows, 
LopProperties.ExecType instType, double tolerance) {
+               Types.ExecMode platformOld = setExecMode(instType);
+               try {
+                       
loadTestConfiguration(getTestConfiguration(getTestName()));
+                       String HOME = SCRIPT_DIR + getTestDir();
+                       fullDMLScriptName = HOME + getTestName() + ".dml";
+                       String out = output("A");
+                       System.out.println(out);
+                       programArgs = new String[] { "-args", 
String.valueOf(numberDistinct), String.valueOf(rows),
+                                       String.valueOf(cols), out};
+
+                       runTest(true, false, null, -1);
+                       writeExpectedScalar("A", numberDistinct);
+                       compareResults(tolerance);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       assertTrue("Exception in execution: " + e.getMessage(), 
false);
+               } finally {
+                       rtplatform = platformOld;
+               }
+       }
+}
\ No newline at end of file
diff --git a/src/test/scripts/functions/countDistinct/countDistinct.dml 
b/src/test/scripts/functions/countDistinct/countDistinct.dml
new file mode 100644
index 0000000..a12ffe2
--- /dev/null
+++ b/src/test/scripts/functions/countDistinct/countDistinct.dml
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+input = round(rand(rows = $2, cols = $3, min = 0, max = $1 -1, seed = 7))
+res = countDistinct(input)
+write(res, $4, format="text")
diff --git a/src/test/scripts/functions/countDistinct/countDistinctApprox.dml 
b/src/test/scripts/functions/countDistinct/countDistinctApprox.dml
new file mode 100644
index 0000000..e8b964e
--- /dev/null
+++ b/src/test/scripts/functions/countDistinct/countDistinctApprox.dml
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+input = round(rand(rows = $2, cols = $3, min = 0, max = $1 -1, seed = 7))
+res = countDistinctApprox(input)
+write(res, $4, format="text")
\ No newline at end of file

Reply via email to