This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 9e0a481540 [SYSTEMDS-3924] Additional tests for OOC stream
creation/collect
9e0a481540 is described below
commit 9e0a4815405753e7407c49f5969fea356c1db56e
Author: Jannik Lindemann <[email protected]>
AuthorDate: Fri Oct 24 16:33:00 2025 +0200
[SYSTEMDS-3924] Additional tests for OOC stream creation/collect
Closes #2340.
---
.../test/functions/ooc/StreamCollectTest.java | 194 +++++++++++++++++++++
.../sysds/test/functions/ooc/TransposeTest.java | 54 +++---
src/test/scripts/functions/ooc/StreamCollect_1.dml | 31 ++++
src/test/scripts/functions/ooc/StreamCollect_2.dml | 31 ++++
src/test/scripts/functions/ooc/StreamCollect_3.dml | 30 ++++
5 files changed, 315 insertions(+), 25 deletions(-)
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/StreamCollectTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/StreamCollectTest.java
new file mode 100644
index 0000000000..877a0dd0b1
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/StreamCollectTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.cp.ReorgCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.TransposeOOCInstruction;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Statistics;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamCollectTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "StreamCollect";
+ private final static String TEST_DIR = "functions/ooc/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
StreamCollectTest.class.getSimpleName() + "/";
+ private final static int rows = 2000;
+ private final static int cols = 1000;
+ private final static String INPUT_NAME = "input";
+ private final static String OUTPUT_NAME = "res";
+ private final static double eps = 1e-10;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ TestConfiguration config = new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+ addTestConfiguration(TEST_NAME1, config);
+ }
+
+ @Test
+ public void runRawInstructionSequenceTest() {
+ try {
+ getAndLoadTestConfiguration(TEST_NAME1);
+ MatrixBlock mb = MatrixBlock.randOperations(rows, cols,
1.0, -1, 1, "uniform", 7);
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+ writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows,
cols, 1000, rows * cols);
+ HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"),
Types.ValueType.FP64,
+ new MatrixCharacteristics(rows, cols, 1000,
rows * cols), Types.FileFormat.BINARY);
+
+ ExecutionContext ec = new ExecutionContext(new
LocalVariableMap());
+
+ VariableCPInstruction createIn =
VariableCPInstruction.parseInstruction(
+ "CP°createvar°pREADX°" + input(INPUT_NAME) +
"°false°MATRIX°binary°" + rows + "°" + cols +
+ "°1000°700147°copy");
+ VariableCPInstruction createInRblk =
VariableCPInstruction.parseInstruction(
+ "CP°createvar°_mVar0°" + input("tmp0") +
"°true°MATRIX°binary°" + rows + "°" + cols +
+ "°1000°700147°copy");
+ ReblockOOCInstruction rblkIn =
ReblockOOCInstruction.parseInstruction(
+
"OOC°rblk°pREADX·MATRIX·FP64°_mVar0·MATRIX·FP64°1000°true");
+ VariableCPInstruction createOut =
VariableCPInstruction.parseInstruction(
+ "CP°createvar°_mVar1°" + input("tmp1") +
"°true°MATRIX°binary°" + rows + "°" + cols +
+ "°1000°700147°copy");
+ TransposeOOCInstruction oocTranspose =
TransposeOOCInstruction.parseInstruction(
+ "OOC°r'°_mVar0·MATRIX·FP64°_mVar1·MATRIX·FP64");
+ VariableCPInstruction createOut2 =
VariableCPInstruction.parseInstruction(
+ "CP°createvar°_mVar2°" + input("tmp2") +
"°true°MATRIX°binary°" + rows + "°" + cols +
+ "°1000°700147°copy");
+ ReorgCPInstruction cpTranspose =
ReorgCPInstruction.parseInstruction(
+
"CP°r'°_mVar1·MATRIX·FP64°_mVar2·MATRIX·FP64°1");
+
+ createIn.processInstruction(ec);
+ createInRblk.processInstruction(ec);
+ rblkIn.processInstruction(ec);
+ createOut.processInstruction(ec);
+ oocTranspose.processInstruction(ec);
+ createOut2.processInstruction(ec);
+ cpTranspose.processInstruction(ec);
+ }
+ catch(Exception ex) {
+ Assert.fail(ex.getMessage());
+ }
+ }
+
+ @Test
+ public void runAppTest1() {
+ runAppTest("_1", List.of(List.of(1000D, 1D), List.of(1000D,
1000D)));
+ }
+
+ @Test
+ public void runAppTest2() {
+ runAppTest("_2", List.of(List.of(500D, 1D), List.of(500D,
500D)));
+ }
+
+ @Test
+ public void runAppTest3() {
+ runAppTest("_3", List.of(List.of(1500D, 1D), List.of(1500D,
1500D)));
+ }
+
+ private void runAppTest(String scriptSuffix, List<List<Double>>
matrixDims) {
+ Types.ExecMode platformOld =
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+ try {
+ getAndLoadTestConfiguration(TEST_NAME1);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + scriptSuffix +
".dml";
+ List<String> proArgs = new ArrayList<>();
+ proArgs.add("-explain");
+ proArgs.add("-stats");
+ proArgs.add("-ooc");
+ proArgs.add("-args");
+ //programArgs = new String[]{"-explain", "-stats",
"-ooc",
+ // "-args", input(INPUT_NAME),
output(OUTPUT_NAME)};
+
+ for(int i = 0; i < matrixDims.size(); i++) {
+ List<Double> dims = matrixDims.get(i);
+ int mrows = dims.get(0).intValue();
+ int mcols = dims.get(1).intValue();
+
+ // 1. Generate the data as MatrixBlock object
+ double[][] A_data = getRandomMatrix(mrows,
mcols, 0, 10, 1, 10);
+
+ // 2. Convert the double arrays to MatrixBlock
object
+ MatrixBlock A_mb =
DataConverter.convertToMatrixBlock(A_data);
+
+ // 3. Create a binary matrix writer
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
+
+ // 4. Write matrix A to a binary SequenceFile
+ writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME
+ "_" + i), mrows, mcols, 1000, A_mb.getNonZeros());
+ HDFSTool.writeMetaDataFile(input(INPUT_NAME +
"_" + i + ".mtd"), Types.ValueType.FP64,
+ new MatrixCharacteristics(mrows, mcols,
1000, A_mb.getNonZeros()), Types.FileFormat.BINARY);
+ proArgs.add(input(INPUT_NAME + "_" + i));
+ }
+
+ proArgs.add(output(OUTPUT_NAME));
+
+ programArgs = proArgs.toArray(String[]::new);
+
+ boolean exceptionExpected = false;
+ runTest(true, exceptionExpected, null, -1);
+
+ // Validate that at least one OOC instruction was used
+ Assert.assertTrue("OOC wasn't used",
heavyHittersContainOOC());
+
+ proArgs.remove(2); // Remove ooc flag
+ proArgs.set(proArgs.size() - 1, output(OUTPUT_NAME +
"_target"));
+ programArgs = proArgs.toArray(String[]::new);
+ runTest(true, exceptionExpected, null, -1);
+
+ HashMap<MatrixValue.CellIndex, Double> result =
readDMLMatrixFromOutputDir(OUTPUT_NAME);
+ HashMap<MatrixValue.CellIndex, Double> target =
readDMLMatrixFromOutputDir(OUTPUT_NAME + "_target");
+ TestUtils.compareMatrices(result, target, eps,
"Result", "Target");
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ resetExecMode(platformOld);
+ }
+ }
+
+ private boolean heavyHittersContainOOC() {
+ for(String opcode : Statistics.getCPHeavyHitterOpCodes())
+ if(opcode.startsWith(Instruction.OOC_INST_PREFIX))
+ return true;
+ return false;
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
index c7a037a50c..726fb971df 100644
--- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
@@ -44,9 +44,9 @@ public class TransposeTest extends AutomatedTestBase {
private static final String INPUT_NAME = "X";
private static final String OUTPUT_NAME = "res";
- private final static int rows = 1000;
- private final static int cols_wide = 1000;
- //private final static int cols_skinny = 500;
+ private final static int rows = 1500;
+ private final static int cols_wide = 2500;
+ private final static int cols_skinny = 500;
private final static double sparsity1 = 0.7;
private final static double sparsity2 = 0.1;
@@ -63,25 +63,32 @@ public class TransposeTest extends AutomatedTestBase {
runTransposeTest(cols_wide, false);
}
-// @Test
-// public void testTranspose2() {
-// runTransposeTest(cols_skinny, false);
-// }
+ @Test
+ public void testTranspose2() {
+ runTransposeTest(cols_skinny, false);
+ }
+
+ @Test
+ public void testTransposeSparse1() {
+ runTransposeTest(cols_wide, true);
+ }
+
+ @Test
+ public void testTransposeSparse2() {
+ runTransposeTest(cols_skinny, true);
+ }
- private void runTransposeTest(int cols, boolean sparse )
- {
+ private void runTransposeTest(int cols, boolean sparse) {
Types.ExecMode platformOld =
setExecMode(Types.ExecMode.SINGLE_NODE);
- try
- {
+ try {
getAndLoadTestConfiguration(TEST_NAME1);
String HOME = SCRIPT_DIR + TEST_DIR;
fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
- programArgs = new String[]{"-explain", "-stats", "-ooc",
- "-args",
input(INPUT_NAME), output(OUTPUT_NAME)};
+ programArgs = new String[] {"-explain", "-stats",
"-ooc", "-args", input(INPUT_NAME), output(OUTPUT_NAME)};
// 1. Generate the data as MatrixBlock object
- double[][] A_data = getRandomMatrix(rows, cols, 0, 1,
sparse?sparsity2:sparsity1, 10);
+ double[][] A_data = getRandomMatrix(rows, cols, 0, 1,
sparse ? sparsity2 : sparsity1, 10);
// 2. Convert the double arrays to MatrixBlock object
MatrixBlock A_mb =
DataConverter.convertToMatrixBlock(A_data);
@@ -92,12 +99,12 @@ public class TransposeTest extends AutomatedTestBase {
// 4. Write matrix A to a binary SequenceFile
writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows,
cols, 1000, A_mb.getNonZeros());
HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"),
Types.ValueType.FP64,
- new
MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()),
Types.FileFormat.BINARY);
+ new MatrixCharacteristics(rows, cols, 1000,
A_mb.getNonZeros()), Types.FileFormat.BINARY);
boolean exceptionExpected = false;
runTest(true, exceptionExpected, null, -1);
- double[][] C1 = readMatrix(output(OUTPUT_NAME),
Types.FileFormat.BINARY, rows, cols, 1000, 1000);
+ double[][] C1 = readMatrix(output(OUTPUT_NAME),
Types.FileFormat.BINARY, cols, rows, 1000);
double result = 0.0;
for(int i = 0; i < rows; i++) { // verify the results
with Java
double expected = 0.0;
@@ -110,12 +117,10 @@ public class TransposeTest extends AutomatedTestBase {
}
String prefix = Instruction.OOC_INST_PREFIX;
- Assert.assertTrue("OOC wasn't used for RBLK",
-
heavyHittersContainsString(prefix + Opcodes.RBLK));
- Assert.assertTrue("OOC wasn't used for TRANSPOSE",
-
heavyHittersContainsString(prefix + Opcodes.TRANSPOSE));
+ Assert.assertTrue("OOC wasn't used for RBLK",
heavyHittersContainsString(prefix + Opcodes.RBLK));
+ Assert.assertTrue("OOC wasn't used for TRANSPOSE",
heavyHittersContainsString(prefix + Opcodes.TRANSPOSE));
}
- catch (IOException e) {
+ catch(IOException e) {
throw new RuntimeException(e);
}
finally {
@@ -123,10 +128,9 @@ public class TransposeTest extends AutomatedTestBase {
}
}
- private static double[][] readMatrix(String fname, Types.FileFormat
fmt, long rows, long cols, int brows, int bcols )
- throws IOException
- {
- MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt,
rows, cols, brows, bcols);
+ private static double[][] readMatrix(String fname, Types.FileFormat
fmt, long rows, long cols, int blen)
+ throws IOException {
+ MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt,
rows, cols, blen);
double[][] C = DataConverter.convertToDoubleMatrix(mb);
return C;
}
diff --git a/src/test/scripts/functions/ooc/StreamCollect_1.dml
b/src/test/scripts/functions/ooc/StreamCollect_1.dml
new file mode 100644
index 0000000000..08116c179d
--- /dev/null
+++ b/src/test/scripts/functions/ooc/StreamCollect_1.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+b = read($1);
+X = read($2);
+
+OOC = rowSums(t(X)) %*% t(rowSums(t(X)));
+OOC = OOC / 1.5;
+
+res = solve(OOC, b);
+
+write(res, $3, format="text");
diff --git a/src/test/scripts/functions/ooc/StreamCollect_2.dml
b/src/test/scripts/functions/ooc/StreamCollect_2.dml
new file mode 100644
index 0000000000..0b643f7749
--- /dev/null
+++ b/src/test/scripts/functions/ooc/StreamCollect_2.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+b = read($1);
+X = read($2);
+
+OOC = X %*% b;
+OOC = OOC %*% t(OOC);
+
+res = solve(OOC, b);
+
+write(res, $3, format="text");
diff --git a/src/test/scripts/functions/ooc/StreamCollect_3.dml
b/src/test/scripts/functions/ooc/StreamCollect_3.dml
new file mode 100644
index 0000000000..f58b7ff101
--- /dev/null
+++ b/src/test/scripts/functions/ooc/StreamCollect_3.dml
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Read the input matrix as a stream
+b = read($1);
+X = read($2);
+
+OOC = ceil(X);
+
+res = solve(OOC, b);
+
+write(res, $3, format="text");