This is an automated email from the ASF dual-hosted git repository.

janardhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e9aaa5e3a [SYSTEMDS-3911] OOC Transpose operation (#2316)
8e9aaa5e3a is described below

commit 8e9aaa5e3af50a6c51a20a6709c7a24ae7475956
Author: Janardhan Pulivarthi <j...@protonmail.com>
AuthorDate: Thu Aug 28 08:12:59 2025 +0530

    [SYSTEMDS-3911] OOC Transpose operation (#2316)
    
    This patch introduces the TransposeOOCInstruction, a fundamental component 
of the out-of-core (OOC) backend. It performs a full transpose on a matrix 
stream, enabling the composition of complex OOC pipelines required for 
algorithms like lmDS.
    
    Implementation detail:
    
    * Asynchronous Producer: The processInstruction method launches a 
background thread to perform the transpose operation but returns control to the 
main thread immediately. This allows the compiler to continue building the 
execution plan without blocking. The actual computation is triggered when a 
downstream consumer "pulls" data from the output stream.
    
    * Streaming Logic: The background thread consumes a stream of 
IndexedMatrixValue blocks from its input. For each block, it:
        -  Performs an in-memory transpose using the standard reorgOperations 
with a ReorgOperator.
        - Crucially, it also transposes the MatrixIndexes of the block (e.g., a 
block at (row=i, col=j) becomes a block at (row=j, col=i)).
        - Enqueues the new, transposed IndexedMatrixValue into the output 
stream.
    
    * Integration: The new instruction is fully integrated into the system:
        - A Reorg type has been added to the OOCType enum.
        - The OOCInstructionParser has been updated to recognize the r' opcode 
for OOC execution and route it to the TransposeOOCInstruction.
        - The new Reorg OOC type is now registered with the 
OOCInstructionParser to correctly route the r' opcode to this new instruction.
---
 .../runtime/instructions/OOCInstructionParser.java |   3 +
 .../runtime/instructions/ooc/OOCInstruction.java   |   2 +-
 .../instructions/ooc/TransposeOOCInstruction.java  |  90 ++++++++++++++
 .../sysds/test/functions/ooc/TransposeTest.java    | 133 +++++++++++++++++++++
 src/test/scripts/functions/ooc/Transpose.dml       |  27 +++++
 5 files changed, 254 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
index 9b1165b819..73b5ca0261 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
@@ -29,6 +29,7 @@ import 
org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
 import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction;
 import 
org.apache.sysds.runtime.instructions.ooc.MatrixVectorBinaryOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.TransposeOOCInstruction;
 
 public class OOCInstructionParser extends InstructionParser {
        protected static final Log LOG = 
LogFactory.getLog(OOCInstructionParser.class.getName());
@@ -60,6 +61,8 @@ public class OOCInstructionParser extends InstructionParser {
                        case AggregateBinary:
                        case MAPMM:
                                return 
MatrixVectorBinaryOOCInstruction.parseInstruction(str);
+                       case Reorg:
+                               return 
TransposeOOCInstruction.parseInstruction(str);
                        
                        default:
                                throw new DMLRuntimeException("Invalid OOC 
Instruction Type: " + ooctype);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
index d3c2dfcbd7..0495dcfde5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java
@@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction {
        protected static final Log LOG = 
LogFactory.getLog(OOCInstruction.class.getName());
 
        public enum OOCType {
-               Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary
+               Reblock, AggregateUnary, Binary, Unary, MAPMM, Reorg, 
AggregateBinary
        }
 
        protected final OOCInstruction.OOCType _ooctype;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
new file mode 100644
index 0000000000..fffd7ee7ed
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.functionobjects.SwapIndex;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+import java.util.concurrent.ExecutorService;
+
+public class TransposeOOCInstruction extends ComputationOOCInstruction {
+
+       protected TransposeOOCInstruction(OOCType type, ReorgOperator op, 
CPOperand in1, CPOperand out, String opcode, String istr) {
+               super(type, op, in1, out, opcode, istr);
+
+       }
+
+       public static TransposeOOCInstruction parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields(parts, 2);
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand out = new CPOperand(parts[2]);
+
+               ReorgOperator reorg = new 
ReorgOperator(SwapIndex.getSwapIndexFnObject());
+               return new TransposeOOCInstruction(OOCType.Reorg, reorg, in1, 
out, opcode, str);
+       }
+
+       public void processInstruction( ExecutionContext ec ) {
+
+               // Create thread and process the transpose operation
+               MatrixObject min = ec.getMatrixObject(input1);
+               LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
+               LocalTaskQueue<IndexedMatrixValue> qOut = new 
LocalTaskQueue<>();
+               ec.getMatrixObject(output).setStreamHandle(qOut);
+
+
+               ExecutorService pool = CommonThreadPool.get();
+               try {
+                       pool.submit(() -> {
+                               IndexedMatrixValue tmp = null;
+                               try {
+                                       while ((tmp = qIn.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS) {
+                                               MatrixBlock inBlock = 
(MatrixBlock)tmp.getValue();
+                                               long oldRowIdx = 
tmp.getIndexes().getRowIndex();
+                                               long oldColIdx = 
tmp.getIndexes().getColumnIndex();
+
+                                               MatrixBlock outBlock = 
inBlock.reorgOperations((ReorgOperator) _optr, new MatrixBlock(), -1, -1, -1);
+                                               qOut.enqueueTask(new 
IndexedMatrixValue(new MatrixIndexes(oldColIdx, oldRowIdx), outBlock));
+                                       }
+                                       qOut.closeInput();
+                               }
+                               catch(Exception ex) {
+                                       throw new DMLRuntimeException(ex);
+                               }
+                       });
+               } catch (Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               } finally {
+                       pool.shutdown();
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java 
b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
new file mode 100644
index 0000000000..0dc04043d4
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TransposeTest extends AutomatedTestBase {
+       private final static String TEST_NAME1 = "Transpose";
+       private final static String TEST_DIR = "functions/ooc/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
TransposeTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-10;
+       private static final String INPUT_NAME = "X";
+       private static final String OUTPUT_NAME = "res";
+
+       private final static int rows = 1000;
+       private final static int cols_wide = 1000;
+       private final static int cols_skinny = 500;
+
+       private final static double sparsity1 = 0.7;
+       private final static double sparsity2 = 0.1;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               TestConfiguration config = new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+               addTestConfiguration(TEST_NAME1, config);
+       }
+
+       @Test
+       public void testTranspose1() {
+               runTransposeTest(cols_wide, false);
+       }
+
+//  @Test
+//  public void testTranspose2() {
+//    runTransposeTest(cols_skinny, false);
+//  }
+
+       private void runTransposeTest(int cols, boolean sparse )
+       {
+               Types.ExecMode platformOld = 
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+               try
+               {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[]{"-explain", "-stats", "-ooc",
+                                                       "-args", 
input(INPUT_NAME), output(OUTPUT_NAME)};
+
+                       // 1. Generate the data as MatrixBlock object
+                       double[][] A_data = getRandomMatrix(rows, cols, 0, 1, 
sparse?sparsity2:sparsity1, 10);
+
+                       // 2. Convert the double arrays to MatrixBlock object
+                       MatrixBlock A_mb = 
DataConverter.convertToMatrixBlock(A_data);
+
+                       // 3. Create a binary matrix writer
+                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+                       // 4. Write matrix A to a binary SequenceFile
+                       writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, 
cols, 1000, A_mb.getNonZeros());
+                       HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), 
Types.ValueType.FP64,
+                                                       new 
MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), 
Types.FileFormat.BINARY);
+
+                       boolean exceptionExpected = false;
+                       runTest(true, exceptionExpected, null, -1);
+
+                       double[][] C1 = readMatrix(output(OUTPUT_NAME), 
Types.FileFormat.BINARY, rows, cols, 1000, 1000);
+                       double result = 0.0;
+                       for(int i = 0; i < rows; i++) { // verify the results 
with Java
+                               double expected = 0.0;
+                               for(int j = 0; j < cols; j++) {
+                                       expected = A_mb.get(i, j);
+                                       result = C1[j][i];
+                                       Assert.assertEquals(expected, result, 
eps);
+                               }
+
+                       }
+
+                       String prefix = Instruction.OOC_INST_PREFIX;
+                       Assert.assertTrue("OOC wasn't used for RBLK",
+                                                       
heavyHittersContainsString(prefix + Opcodes.RBLK));
+                       Assert.assertTrue("OOC wasn't used for TRANSPOSE",
+                                                       
heavyHittersContainsString(prefix + Opcodes.TRANSPOSE));
+               }
+               catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+               finally {
+                       resetExecMode(platformOld);
+               }
+       }
+
+       private static double[][] readMatrix(String fname, Types.FileFormat 
fmt, long rows, long cols, int brows, int bcols )
+                                       throws IOException
+       {
+               MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, 
rows, cols, brows, bcols);
+               double[][] C = DataConverter.convertToDoubleMatrix(mb);
+               return C;
+       }
+}
diff --git a/src/test/scripts/functions/ooc/Transpose.dml 
b/src/test/scripts/functions/ooc/Transpose.dml
new file mode 100644
index 0000000000..9b38939a2e
--- /dev/null
+++ b/src/test/scripts/functions/ooc/Transpose.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+X = read($1);
+
+res = t(X);
+
+write(res, $2, format="binary");

Reply via email to