mboehm7 commented on code in PR #2290:
URL: https://github.com/apache/systemds/pull/2290#discussion_r2207756235


##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {

Review Comment:
   add a check for a minimum size for parallelization, otherwise fall back to 
single-threaded as we do it for example in aggregations (LibMatrixAgg).



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -128,7 +128,11 @@ public static MatrixBlock reorg( MatrixBlock in, 
MatrixBlock out, ReorgOperator
                                else
                                        return transpose(in, out);
                        case REV:
-                               return rev(in, out);
+//                             System.out.println("Reorg: rev() called with 
numThreads: " + op.getNumThreads());

Review Comment:
   remove these comment sysout lines.



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {
+                       System.out.println("choosing single thread");
+                       return rev(in, out); // fallback to single-threaded
+
+               }
+               final int numRows = in.getNumRows();
+               final int numCols = in.getNumColumns();
+               final boolean sparse = in.isInSparseFormat();
+
+               // Prepare output block
+               out.reset(numRows, numCols, sparse);
+
+               // Before starting threads, ensure the output sparse block is 
allocated!
+               if (sparse) {
+                       out.allocateSparseRowsBlock(false);
+               }
+
+               // Set up thread pool
+               ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       int blklen = (int) Math.ceil((double) numRows / k);

Review Comment:
   I would recommend to create smaller tasks (e.g., numRows/k/4) which yields 
better load balancing.



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {
+                       System.out.println("choosing single thread");
+                       return rev(in, out); // fallback to single-threaded
+
+               }
+               final int numRows = in.getNumRows();
+               final int numCols = in.getNumColumns();
+               final boolean sparse = in.isInSparseFormat();
+
+               // Prepare output block
+               out.reset(numRows, numCols, sparse);
+
+               // Before starting threads, ensure the output sparse block is 
allocated!
+               if (sparse) {
+                       out.allocateSparseRowsBlock(false);
+               }
+
+               // Set up thread pool
+               ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       int blklen = (int) Math.ceil((double) numRows / k);
+                       List<Future<?>> tasks = new ArrayList<>();
+
+                       for (int i = 0; i < k; i++) {
+                               final int startRow = i * blklen;
+                               final int endRow = Math.min((i + 1) * blklen, 
numRows);
+
+                               tasks.add(pool.submit(() -> {
+                                       if (!sparse) {

Review Comment:
   create a static method for this kernel which is called from both the 
single-threaded implementation as well as the multi-threaded implementation 



##########
src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java:
##########
@@ -165,6 +188,76 @@ else if ( instType == ExecType.SPARK )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
                }
        }
-       
+
+       private void runReverseTestMultiThread(String testname, boolean matrix, 
boolean sparse, ExecType instType)
+       {
+               // Compare single-thread vs multi-thread results
+//             HashMap<CellIndex, Double> stResult = 
runReverseWithThreads(testname, matrix, sparse, instType, 1);
+               HashMap<CellIndex, Double> mtResult = 
runReverseWithThreads(testname, matrix, sparse, instType, 8);
+
+               // Compare results to ensure consistency
+//             TestUtils.compareMatrices(stResult, mtResult, 0, "ST-Result", 
"MT-Result");
+       }
+
+       private HashMap<CellIndex, Double> runReverseWithThreads(String 
testname, boolean matrix, boolean sparse, ExecType instType, int numThreads)
+       {
+               //rtplatform for MR
+               ExecMode platformOld = rtplatform;
+               switch( instType ){
+                       case SPARK: rtplatform = ExecMode.SPARK; break;
+                       default: rtplatform = ExecMode.HYBRID; break;
+               }
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == ExecMode.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+               String TEST_NAME = testname;
+
+               try
+               {
+                       System.setProperty("sysds.parallel.threads", 
String.valueOf(numThreads));

Review Comment:
   remove (I don't think we use this property internally)



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -2486,6 +2559,7 @@ private static void reverseDense(MatrixBlock in, 
MatrixBlock out) {
 
        private static void reverseSparse(MatrixBlock in, MatrixBlock out) {
                final int m = in.rlen;
+               System.out.println("inside reverseSparse");

Review Comment:
   remove



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {
+                       System.out.println("choosing single thread");
+                       return rev(in, out); // fallback to single-threaded
+
+               }
+               final int numRows = in.getNumRows();
+               final int numCols = in.getNumColumns();
+               final boolean sparse = in.isInSparseFormat();
+
+               // Prepare output block
+               out.reset(numRows, numCols, sparse);
+
+               // Before starting threads, ensure the output sparse block is 
allocated!
+               if (sparse) {
+                       out.allocateSparseRowsBlock(false);
+               }
+
+               // Set up thread pool
+               ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       int blklen = (int) Math.ceil((double) numRows / k);
+                       List<Future<?>> tasks = new ArrayList<>();
+
+                       for (int i = 0; i < k; i++) {
+                               final int startRow = i * blklen;
+                               final int endRow = Math.min((i + 1) * blklen, 
numRows);
+
+                               tasks.add(pool.submit(() -> {
+                                       if (!sparse) {
+                                               // Dense case
+                                               System.out.println("dense 
case");
+                                               double[] inVals = 
in.getDenseBlockValues();
+                                               double[] outVals = 
out.getDenseBlockValues();
+                                               for (int r = startRow; r < 
endRow; r++) {
+                                                       int revRow = numRows - 
r - 1;
+                                                       
System.arraycopy(inVals, revRow * numCols, outVals, r * numCols,
+                                                                       
numCols);

Review Comment:
   avoid this line break



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -2461,6 +2533,7 @@ public static int[] mergeNnzCounts(int[] cnt, int[] cnt2) 
{
        private static void reverseDense(MatrixBlock in, MatrixBlock out) {
                final int m = in.rlen;
                final int n = in.clen;
+               System.out.println("inside reverseDense");

Review Comment:
   remove



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {
+                       System.out.println("choosing single thread");
+                       return rev(in, out); // fallback to single-threaded
+
+               }
+               final int numRows = in.getNumRows();
+               final int numCols = in.getNumColumns();
+               final boolean sparse = in.isInSparseFormat();
+
+               // Prepare output block
+               out.reset(numRows, numCols, sparse);
+
+               // Before starting threads, ensure the output sparse block is 
allocated!
+               if (sparse) {
+                       out.allocateSparseRowsBlock(false);
+               }
+
+               // Set up thread pool
+               ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       int blklen = (int) Math.ceil((double) numRows / k);
+                       List<Future<?>> tasks = new ArrayList<>();
+
+                       for (int i = 0; i < k; i++) {
+                               final int startRow = i * blklen;
+                               final int endRow = Math.min((i + 1) * blklen, 
numRows);
+
+                               tasks.add(pool.submit(() -> {
+                                       if (!sparse) {
+                                               // Dense case
+                                               System.out.println("dense 
case");
+                                               double[] inVals = 
in.getDenseBlockValues();
+                                               double[] outVals = 
out.getDenseBlockValues();
+                                               for (int r = startRow; r < 
endRow; r++) {
+                                                       int revRow = numRows - 
r - 1;
+                                                       
System.arraycopy(inVals, revRow * numCols, outVals, r * numCols,
+                                                                       
numCols);
+                                               }
+                                       } else {
+                                               // Sparse case
+                                               System.out.println("Sparse 
case");
+                                               SparseBlock inBlk = 
in.getSparseBlock();
+                                               SparseBlock outBlk = 
out.getSparseBlock();
+                                               for (int r = startRow; r < 
endRow; r++) {
+                                                       int revRow = numRows - 
r - 1;
+                                                       if 
(!inBlk.isEmpty(revRow)) {
+                                                               outBlk.set(r, 
inBlk.get(revRow), true);
+                                                       }
+                                               }
+                                       }
+                               }));
+                       }
+
+                       // Wait for all threads
+                       for (Future<?> task : tasks) {
+                               task.get();
+                       }
+               } catch (Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               } finally {
+                       pool.shutdown();
+               }
+               out.recomputeNonZeros();

Review Comment:
   avoid this unnecessary recomputation of the the number of non-zeros. We 
should just set the nnz of the output to the nnz of the input, as this 
reorganization does not change values but only placed them differently



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {
+                       System.out.println("choosing single thread");

Review Comment:
   as above



##########
src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java:
##########
@@ -389,10 +393,78 @@ public static MatrixBlock rev( MatrixBlock in, 
MatrixBlock out ) {
                return out;
        }
 
+       public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) {
+               if (k <= 1 || in.isEmptyBlock(false)) {
+                       System.out.println("choosing single thread");
+                       return rev(in, out); // fallback to single-threaded
+
+               }
+               final int numRows = in.getNumRows();
+               final int numCols = in.getNumColumns();
+               final boolean sparse = in.isInSparseFormat();
+
+               // Prepare output block
+               out.reset(numRows, numCols, sparse);
+
+               // Before starting threads, ensure the output sparse block is 
allocated!
+               if (sparse) {
+                       out.allocateSparseRowsBlock(false);
+               }
+
+               // Set up thread pool
+               ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       int blklen = (int) Math.ceil((double) numRows / k);
+                       List<Future<?>> tasks = new ArrayList<>();
+
+                       for (int i = 0; i < k; i++) {
+                               final int startRow = i * blklen;
+                               final int endRow = Math.min((i + 1) * blklen, 
numRows);
+
+                               tasks.add(pool.submit(() -> {
+                                       if (!sparse) {
+                                               // Dense case
+                                               System.out.println("dense 
case");

Review Comment:
   remove



##########
src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+
+public class TestMultiThreadedRev {

Review Comment:
   remove this class, and rather ensure that some of the existing tests run 
with large enough inputs that it triggers the multi-threaded code path too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@systemds.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to