This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 15df58fe0b [SYSTEMDS-3467] Support of MULTI_BLOCK Spark for 
countDistinct()
15df58fe0b is described below

commit 15df58fe0b4a9ab8290a34b57e08850c0e5b5f05
Author: Badrul Chowdhury <[email protected]>
AuthorDate: Fri Nov 18 17:17:06 2022 -0800

    [SYSTEMDS-3467] Support of MULTI_BLOCK Spark for countDistinct()
    
    This patch adds support for running MULTI_BLOCK aggregations
    for countDistinct() builtin on the Spark backend.
    The implementation augments the CountDistinctFunctionSketch
    with the union() function implementation.
    
    Closes #1734
---
 .../sketch/countdistinct/BitMapValueCombiner.java  | 41 +++++++++++++++
 .../countdistinct/CountDistinctFunctionSketch.java | 58 ++++++++++++++++++++--
 .../countDistinct/CountDistinctRowColBase.java     |  7 +++
 3 files changed, 102 insertions(+), 4 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/BitMapValueCombiner.java
 
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/BitMapValueCombiner.java
new file mode 100644
index 0000000000..5799f86ba8
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/BitMapValueCombiner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data.sketch.countdistinct;
+
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+public class BitMapValueCombiner implements BinaryOperator<Set<Long>> {
+
+       @Override
+       public Set<Long> apply(Set<Long> set0, Set<Long> set1) {
+               if (set0.isEmpty()) {
+                       return set1;
+               }
+
+               if (set1.isEmpty()) {
+                       return set0;
+               }
+
+               // Merging left-right is identical to merging right-left
+               set0.addAll(set1);
+               return set0;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
 
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
index efdcfa69a6..79c6daf39a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
+++ 
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.runtime.matrix.data.sketch.countdistinct;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.hops.OptimizerUtils;
@@ -31,7 +30,10 @@ import org.apache.sysds.runtime.matrix.operators.Operator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.OptionalInt;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class CountDistinctFunctionSketch extends CountDistinctSketch {
 
@@ -110,7 +112,7 @@ public class CountDistinctFunctionSketch extends 
CountDistinctSketch {
                        }
                }
 
-               MatrixBlock blkOutCorr = serializeInputMatrixBlock(bitMap, 
maxColumns);
+               MatrixBlock blkOutCorr = serialize(bitMap, maxColumns);
 
                // The sketch contains all relevant info, so the input matrix 
can be discarded at this point
                return new CorrMatrixBlock(blkIn, blkOutCorr);
@@ -121,7 +123,7 @@ public class CountDistinctFunctionSketch extends 
CountDistinctSketch {
                return kMask & (n >> startingIndex);
        }
 
-       private MatrixBlock serializeInputMatrixBlock(Map<Short, Set<Long>> 
bitMap, int maxWidth) {
+       private MatrixBlock serialize(Map<Short, Set<Long>> bitMap, int 
maxWidth) {
 
                // Each row in output matrix corresponds to a key and each 
column to a fraction value for that key.
                // The first column will store the exponent value itself:
@@ -150,9 +152,57 @@ public class CountDistinctFunctionSketch extends 
CountDistinctSketch {
                return blkOut;
        }
 
+       private Map<Short, Set<Long>> deserialize(MatrixBlock blkIn) {
+               int R = blkIn.getNumRows();
+               Map<Short, Set<Long>> bitMap = new HashMap<>();
+
+               // row_i: [exponent_i, N_i, fraction_i0, fraction_i1, .., 
fraction_iN, 0, .., 0]
+               for (int i=0; i<R; ++i) {
+                       short key = (short) blkIn.getValue(i, 0);
+                       Set<Long> fractions = bitMap.getOrDefault(key, new 
HashSet<>());
+
+                       int C = (int) blkIn.getValue(i, 1);
+                       int j = 0;
+                       while (j < C) {
+                               long fraction = (long) blkIn.getValue(i, j + 2);
+                               fractions.add(fraction);
+                               ++j;
+                       }
+
+                       bitMap.put(key, fractions);
+               }
+
+               return bitMap;
+       }
+
        @Override
        public CorrMatrixBlock union(CorrMatrixBlock arg0, CorrMatrixBlock 
arg1) {
-               throw new NotImplementedException("MULTI_BLOCK aggregation is 
not supported yet");
+               MatrixBlock corr0 = arg0.getCorrection();
+               Map<Short, Set<Long>> bitMap0 = deserialize(corr0);
+
+               MatrixBlock corr1 = arg1.getCorrection();
+               Map<Short, Set<Long>> bitMap1 = deserialize(corr1);
+
+               // Map putAll() is not suitable here as it will replace Map 
values for identical keys.
+               // We will use a custom combiner with stream() and collect() 
instead.
+               Map<Short, Set<Long>> bitMapOut =
+                               Stream.concat(bitMap0.entrySet().stream(), 
bitMap1.entrySet().stream())
+                                               .collect(Collectors.toMap(
+                                                               
Map.Entry::getKey,
+                                                               
Map.Entry::getValue,
+                                                               new 
BitMapValueCombiner()
+                                               ));
+
+               // Find the maximum column width
+               OptionalInt maxWidthOpt = 
bitMapOut.values().stream().mapToInt(Set::size).max();
+               if (maxWidthOpt.isEmpty()) {
+                       throw new IllegalArgumentException("Corrupt sketch: 
metadata is invalid");
+               }
+
+               int maxWidth = maxWidthOpt.getAsInt();
+               MatrixBlock blkOutCorr = serialize(bitMapOut, maxWidth);
+
+               return new CorrMatrixBlock(arg0.getValue(), blkOutCorr);
        }
 
        @Override
diff --git 
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
index 5a7a61c6ce..c32ec12a54 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
@@ -58,6 +58,13 @@ public abstract class CountDistinctRowColBase extends 
CountDistinctBase {
                countDistinctScalarTest(1723, 5000, 2000, 1.0, ex, tolerance);
        }
 
+       @Test
+       public void testSparkDenseXLarge() {
+               ExecType ex = ExecType.SPARK;
+               double tolerance = baseTolerance + 1723 * percentTolerance;
+               countDistinctScalarTest(1723, 5000, 2000, 1.0, ex, tolerance);
+       }
+
        @Test
        public void testCPDense1Unique() {
                ExecType ex = ExecType.CP;

Reply via email to