This is an automated email from the ASF dual-hosted git repository. janardhan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 8e9aaa5e3a [SYSTEMDS-3911] OOC Transpose operation (#2316) 8e9aaa5e3a is described below commit 8e9aaa5e3af50a6c51a20a6709c7a24ae7475956 Author: Janardhan Pulivarthi <j...@protonmail.com> AuthorDate: Thu Aug 28 08:12:59 2025 +0530 [SYSTEMDS-3911] OOC Transpose operation (#2316) This patch introduces the TransposeOOCInstruction, a fundamental component of the out-of-core (OOC) backend. It performs a full transpose on a matrix stream, enabling the composition of complex OOC pipelines required for algorithms like lmDS. Implementation detail: * Asynchronous Producer: The processInstruction method launches a background thread to perform the transpose operation but returns control to the main thread immediately. This allows the compiler to continue building the execution plan without blocking. The actual computation is triggered when a downstream consumer "pulls" data from the output stream. * Streaming Logic: The background thread consumes a stream of IndexedMatrixValue blocks from its input. For each block, it: - Performs an in-memory transpose using the standard reorgOperations with a ReorgOperator. - Crucially, it also transposes the MatrixIndexes of the block (e.g., a block at (row=i, col=j) becomes a block at (row=j, col=i)). - Enqueues the new, transposed IndexedMatrixValue into the output stream. * Integration: The new instruction is fully integrated into the system: - A Reorg type has been added to the OOCType enum. - The OOCInstructionParser has been updated to recognize the r' opcode for OOC execution and route it to the TransposeOOCInstruction. - The new Reorg OOC type is now registered with the OOCInstructionParser to correctly route the r' opcode to this new instruction. --- .../runtime/instructions/OOCInstructionParser.java | 3 + .../runtime/instructions/ooc/OOCInstruction.java | 2 +- .../instructions/ooc/TransposeOOCInstruction.java | 90 ++++++++++++++ .../sysds/test/functions/ooc/TransposeTest.java | 133 +++++++++++++++++++++ src/test/scripts/functions/ooc/Transpose.dml | 27 +++++ 5 files changed, 254 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index 9b1165b819..73b5ca0261 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -29,6 +29,7 @@ import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.MatrixVectorBinaryOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.TransposeOOCInstruction; public class OOCInstructionParser extends InstructionParser { protected static final Log LOG = LogFactory.getLog(OOCInstructionParser.class.getName()); @@ -60,6 +61,8 @@ public class OOCInstructionParser extends InstructionParser { case AggregateBinary: case MAPMM: return MatrixVectorBinaryOOCInstruction.parseInstruction(str); + case Reorg: + return TransposeOOCInstruction.parseInstruction(str); default: throw new DMLRuntimeException("Invalid OOC Instruction Type: " + ooctype); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index d3c2dfcbd7..0495dcfde5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction { protected static final Log LOG = LogFactory.getLog(OOCInstruction.class.getName()); public enum OOCType { - Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary + Reblock, AggregateUnary, Binary, Unary, MAPMM, Reorg, AggregateBinary } protected final OOCInstruction.OOCType _ooctype; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java new file mode 100644 index 0000000000..fffd7ee7ed --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.functionobjects.SwapIndex; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.runtime.util.CommonThreadPool; + +import java.util.concurrent.ExecutorService; + +public class TransposeOOCInstruction extends ComputationOOCInstruction { + + protected TransposeOOCInstruction(OOCType type, ReorgOperator op, CPOperand in1, CPOperand out, String opcode, String istr) { + super(type, op, in1, out, opcode, istr); + + } + + public static TransposeOOCInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields(parts, 2); + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand out = new CPOperand(parts[2]); + + ReorgOperator reorg = new ReorgOperator(SwapIndex.getSwapIndexFnObject()); + return new TransposeOOCInstruction(OOCType.Reorg, reorg, in1, out, opcode, str); + } + + public void processInstruction( ExecutionContext ec ) { + + // Create thread and process the transpose operation + MatrixObject min = ec.getMatrixObject(input1); + LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle(); + LocalTaskQueue<IndexedMatrixValue> qOut = new LocalTaskQueue<>(); + ec.getMatrixObject(output).setStreamHandle(qOut); + + + ExecutorService pool = CommonThreadPool.get(); + try { + pool.submit(() -> { + IndexedMatrixValue tmp = null; + try { + while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock inBlock = (MatrixBlock)tmp.getValue(); + long oldRowIdx = tmp.getIndexes().getRowIndex(); + long oldColIdx = tmp.getIndexes().getColumnIndex(); + + MatrixBlock outBlock = inBlock.reorgOperations((ReorgOperator) _optr, new MatrixBlock(), -1, -1, -1); + qOut.enqueueTask(new IndexedMatrixValue(new MatrixIndexes(oldColIdx, oldRowIdx), outBlock)); + } + qOut.closeInput(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + }); + } catch (Exception ex) { + throw new DMLRuntimeException(ex); + } finally { + pool.shutdown(); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java new file mode 100644 index 0000000000..0dc04043d4 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.instructions.Instruction; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TransposeTest extends AutomatedTestBase { + private final static String TEST_NAME1 = "Transpose"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + TransposeTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + private static final String INPUT_NAME = "X"; + private static final String OUTPUT_NAME = "res"; + + private final static int rows = 1000; + private final static int cols_wide = 1000; + private final static int cols_skinny = 500; + + private final static double sparsity1 = 0.7; + private final static double sparsity2 = 0.1; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); + addTestConfiguration(TEST_NAME1, config); + } + + @Test + public void testTranspose1() { + runTransposeTest(cols_wide, false); + } + +// @Test +// public void testTranspose2() { +// runTransposeTest(cols_skinny, false); +// } + + private void runTransposeTest(int cols, boolean sparse ) + { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try + { + getAndLoadTestConfiguration(TEST_NAME1); + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-explain", "-stats", "-ooc", + "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; + + // 1. Generate the data as MatrixBlock object + double[][] A_data = getRandomMatrix(rows, cols, 0, 1, sparse?sparsity2:sparsity1, 10); + + // 2. Convert the double arrays to MatrixBlock object + MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data); + + // 3. Create a binary matrix writer + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + + // 4. Write matrix A to a binary SequenceFile + writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, 1000, A_mb.getNonZeros()); + HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), Types.FileFormat.BINARY); + + boolean exceptionExpected = false; + runTest(true, exceptionExpected, null, -1); + + double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000); + double result = 0.0; + for(int i = 0; i < rows; i++) { // verify the results with Java + double expected = 0.0; + for(int j = 0; j < cols; j++) { + expected = A_mb.get(i, j); + result = C1[j][i]; + Assert.assertEquals(expected, result, eps); + } + + } + + String prefix = Instruction.OOC_INST_PREFIX; + Assert.assertTrue("OOC wasn't used for RBLK", + heavyHittersContainsString(prefix + Opcodes.RBLK)); + Assert.assertTrue("OOC wasn't used for TRANSPOSE", + heavyHittersContainsString(prefix + Opcodes.TRANSPOSE)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + resetExecMode(platformOld); + } + } + + private static double[][] readMatrix(String fname, Types.FileFormat fmt, long rows, long cols, int brows, int bcols ) + throws IOException + { + MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols); + double[][] C = DataConverter.convertToDoubleMatrix(mb); + return C; + } +} diff --git a/src/test/scripts/functions/ooc/Transpose.dml b/src/test/scripts/functions/ooc/Transpose.dml new file mode 100644 index 0000000000..9b38939a2e --- /dev/null +++ b/src/test/scripts/functions/ooc/Transpose.dml @@ -0,0 +1,27 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read the input matrix as a stream +X = read($1); + +res = t(X); + +write(res, $2, format="binary");