This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b6adff8ae5 [MINOR] CompressedMatrixBlock parallel nonzero count
b6adff8ae5 is described below

commit b6adff8ae5fa2cc29fbede25e16b8b4c81c33c7c
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Mon Jan 6 15:22:52 2025 +0100

    [MINOR] CompressedMatrixBlock parallel nonzero count
---
 .../runtime/compress/CompressedMatrixBlock.java    | 31 ++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 68cfc6f983..e74e6c12f7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -28,6 +28,7 @@ import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.NotImplementedException;
@@ -88,6 +89,7 @@ import 
org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.matrix.operators.TernaryOperator;
 import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.utils.DMLCompressionStatistics;
 import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
@@ -319,6 +321,35 @@ public class CompressedMatrixBlock extends MatrixBlock {
                return nonZeros;
        }
 
+       @Override
+       public long recomputeNonZeros(int k) {
+               if(k <= 1 || isOverlapping() || _colGroups.size() <= 1)
+                       return recomputeNonZeros();
+
+               final ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       List<Future<Long>> tasks = new ArrayList<>();
+                       for(AColGroup g : _colGroups)
+                               tasks.add(pool.submit(() -> 
g.getNumberNonZeros(rlen)));
+
+                       long nnz = 0;
+                       for(Future<Long> t : tasks)
+                               nnz += t.get();
+                       nonZeros = nnz;
+               }
+               catch(Exception e) {
+                       throw new DMLRuntimeException("Failed to count non 
zeros", e);
+               }
+               finally {
+                       pool.shutdown();
+               }
+
+               if(nonZeros == 0) // If there is no nonzeros then reallocate 
into single empty column group.
+                       allocateColGroup(ColGroupEmpty.create(getNumColumns()));
+
+               return nonZeros;
+       }
+
        @Override
        public long recomputeNonZeros(int rl, int ru) {
                throw new NotImplementedException();

Reply via email to