e-strauss commented on code in PR #2230:
URL: https://github.com/apache/systemds/pull/2230#discussion_r1963633066


##########
src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java:
##########
@@ -403,25 +447,133 @@ else if(nnz == 0) // all was 0 -> return empty.
                return ret;
        }
 
+       private static MatrixBlock 
binaryMVComparisonColSingleThreadCompressed(CompressedMatrixBlock m1, 
MatrixBlock m2,
+                                                                               
                                                                   
BinaryOperator op, boolean left) {
+               final int nRows = m1.getNumRows();
+               final int nCols = m1.getNumColumns();
+
+               // get indicators (one-hot-encoded comparison results)
+               BinaryMVColTaskCompressed task =  new 
BinaryMVColTaskCompressed(m1, m2, 0, nRows, op, left);
+               long nnz = task.call();
+               int[] indicators = task._ret;
+
+               // map each unique indicator to an index
+               HashMapToInt<Integer> hm = new HashMapToInt<>(nCols*2);
+               int[] colMap = new int[nRows];
+               for(int i = 0; i < m1.getNumRows(); i++){
+                       int nextId = hm.size();
+                       int id = hm.putIfAbsentI(indicators[i], nextId);
+                       colMap[i] = id == -1 ? nextId : id;
+               }
+
+               // decode the unique indicator ints to SparseVectors
+               MatrixBlock outMb = getMCSRMatrixBlock(hm, nCols);
+
+               // create compressed block
+               return getCompressedMatrixBlock(m1, colMap, hm, outMb, nRows, 
nCols, nnz);
+       }
+
+       private static void fillSparseBlockFromIndicatorFromIndicatorInt(int 
numCol, Integer indicator, Integer rix, SparseBlockMCSR out) {
+               ArrayList<Integer> colIndices = new ArrayList<>(8);
+               for (int c = numCol - 1; c >= 0; c--) {
+                       if(indicator <= 0)
+                               break;
+                       if(indicator % 2 == 1){
+                               colIndices.add(c);
+                       }
+                       indicator = indicator >> 1;
+               }
+               SparseRow row = null;
+               if(colIndices.size() > 1){
+                       double[] vals = new double[colIndices.size()];
+                       Arrays.fill(vals, 1);
+                       int[] indices = new int[colIndices.size()];
+                       for (int i = 0, j = colIndices.size() - 1; i < 
colIndices.size(); i++, j--)
+                               indices[i] = colIndices.get(j);
+
+                       row = new SparseRowVector(vals, indices);
+               } else if(colIndices.size() == 1){
+                       row = new SparseRowScalar(colIndices.get(0), 1.0);
+               }
+               out.set(rix, row, false);
+       }
+
+       private static MatrixBlock 
binaryMVComparisonColMultiCompressed(CompressedMatrixBlock m1, MatrixBlock m2,
+                                                                               
                                                        BinaryOperator op, 
boolean left) throws Exception {
+               final int nRows = m1.getNumRows();
+               final int nCols = m1.getNumColumns();
+               final int k = op.getNumThreads();
+               final int blkz = nRows / k;
+
+               // get indicators (one-hot-encoded comparison results)
+               long nnz = 0;
+               final ArrayList<BinaryMVColTaskCompressed> tasks = new 
ArrayList<>();
+               final ExecutorService pool = 
CommonThreadPool.get(op.getNumThreads());
+               try {
+                       for(int i = 0; i < nRows; i += blkz) {
+                               tasks.add(new BinaryMVColTaskCompressed(m1, m2, 
i, Math.min(nRows, i + blkz), op, left));
+                       }
+                       for(Future<Long> f : pool.invokeAll(tasks))

Review Comment:
   but we have to to merge the results in a single thread anyways or do you 
want to use a concurrent hashmap?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@systemds.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to