This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 4151578e7b [SYSTEMDS-3601] Performance IO benchmark
4151578e7b is described below
commit 4151578e7baab492b469728a4991e90b9aa41da6
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed Sep 6 11:14:00 2023 +0200
[SYSTEMDS-3601] Performance IO benchmark
Adds IO benchmark for Matrix/compressed matrix writing:
In this commit the numbers are:
Generator: ConstMatrix ( Rows:10000, Cols:100, Spar:1.0, Unique: 4)
StandardDisk, 16.821+- 0.829 ms, 475607133 Byte/s
Compress StandardIO, 9.806+- 1.066 ms, 815833646 Byte/s
Update&Apply Standard IO, 4.460+- 0.444 ms, 1793930325 Byte/s
Closes #1897
---
.../java/org/apache/sysds/performance/Main.java | 72 +++++++++++++++++--
.../org/apache/sysds/performance/PerfUtil.java | 34 +++++++++
.../java/org/apache/sysds/performance/README.md | 19 +++++
.../org/apache/sysds/performance/TimingUtils.java | 6 +-
.../sysds/performance/compression/APerfTest.java | 36 +++++++---
.../sysds/performance/compression/Serialize.java | 46 +++++++------
.../generators/{IGenerate.java => Const.java} | 36 +---------
.../sysds/performance/generators/ConstFrame.java | 67 ++++++++++++++++++
.../sysds/performance/generators/ConstMatrix.java | 24 ++++---
.../sysds/performance/generators/FrameFile.java | 80 ++++++++++++++++++++++
.../sysds/performance/generators/IGenerate.java | 5 ++
.../sysds/performance/generators/MatrixFile.java | 58 ++++++++++++++++
12 files changed, 403 insertions(+), 80 deletions(-)
diff --git a/src/test/java/org/apache/sysds/performance/Main.java
b/src/test/java/org/apache/sysds/performance/Main.java
index fa89a62b53..3fd2def237 100644
--- a/src/test/java/org/apache/sysds/performance/Main.java
+++ b/src/test/java/org/apache/sysds/performance/Main.java
@@ -24,12 +24,18 @@ import org.apache.sysds.performance.compression.SchemaTest;
import org.apache.sysds.performance.compression.Serialize;
import org.apache.sysds.performance.compression.StreamCompress;
import org.apache.sysds.performance.generators.ConstMatrix;
+import org.apache.sysds.performance.generators.FrameFile;
import org.apache.sysds.performance.generators.GenMatrices;
+import org.apache.sysds.performance.generators.IGenerate;
+import org.apache.sysds.performance.generators.MatrixFile;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.test.TestUtils;
public class Main {
- private static void exec(int prog, String[] args) throws
InterruptedException, Exception {
+ private static void exec(int prog, String[] args) throws Exception {
switch(prog) {
case 1:
new StreamCompress(100, new GenMatrices(10000,
100, 32, 1.0)).run();
@@ -79,12 +85,25 @@ public class Main {
case 12:
run11(args, Integer.parseInt(args[7]));
break;
+ case 13:
+ run13(args);
+ break;
+ case 14:
+ run14(args);
+ break;
+
+ case 15:
+ run15(args);
+ break;
+ case 16:
+ run16(args);
+ break;
default:
break;
}
}
- private static void run9(String[] args) throws InterruptedException,
Exception {
+ private static void run9(String[] args) throws Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
@@ -94,7 +113,7 @@ public class Main {
new IOBandwidth(n, new ConstMatrix(rows, cols, unique,
sparsity), k).run();
}
- private static void run10(String[] args) throws InterruptedException,
Exception {
+ private static void run10(String[] args) throws Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
@@ -104,7 +123,7 @@ public class Main {
new IOBandwidth(n, new ConstMatrix(rows, cols, unique,
sparsity), k).runVector();
}
- private static void run11(String[] args, int id) throws
InterruptedException, Exception {
+ private static void run11(String[] args, int id) throws Exception {
int rows = Integer.parseInt(args[1]);
int cols = Integer.parseInt(args[2]);
int unique = Integer.parseInt(args[3]);
@@ -120,13 +139,56 @@ public class Main {
s.run(id);
}
+ private static void run13(String[] args) throws Exception {
+ int k = Integer.parseInt(args[1]);
+ int n = Integer.parseInt(args[2]);
+ String p = args[3];
+ int id = Integer.parseInt(args[4]);
+ run13A(n, MatrixFile.create(p), k, id);
+ }
+
+ private static void run14(String[] args) throws Exception {
+ int k = Integer.parseInt(args[1]);
+ int n = Integer.parseInt(args[2]);
+ String p = args[3]; // input frame
+ String s = args[4]; // spec
+ int id = Integer.parseInt(args[5]);
+ // run13A(n, FrameTransformFile.create(p, s), k, id);
+ }
+
+ private static void run13A(int n, IGenerate<MatrixBlock> g, int k, int
id) throws Exception {
+
+ Serialize s = new Serialize(n, g, k);
+
+ if(id == -1)
+ s.run();
+ else
+ s.run(id);
+ }
+
+ private static void run15(String[] args) throws Exception {
+ int k = Integer.parseInt(args[1]);
+ int n = Integer.parseInt(args[2]);
+ IGenerate<FrameBlock> g = FrameFile.create(args[3]);
+ String spec = args[4];
+ // new TransformPerf(n, k, g, spec).run();
+ }
+
+ private static void run16(String[] args) {
+ int len = Integer.parseInt(args[1]);
+ MatrixBlock mb =
TestUtils.ceil(TestUtils.generateTestMatrixBlock(len, len, 0, 100, 0.01, len
+1));
+ System.out.println(mb);
+ }
+
+
public static void main(String[] args) {
try {
exec(Integer.parseInt(args[0]), args);
}
catch(Exception e) {
e.printStackTrace();
- }finally{
+ }
+ finally {
CommonThreadPool.get().shutdown();
}
}
diff --git a/src/test/java/org/apache/sysds/performance/PerfUtil.java
b/src/test/java/org/apache/sysds/performance/PerfUtil.java
new file mode 100644
index 0000000000..f93b03bdb3
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/PerfUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface PerfUtil {
+
+ public static String readSpec(String path) throws IOException {
+ InputStream in = new FileInputStream(path);
+ String spec = new String(in.readAllBytes());
+ in.close();
+ return spec;
+ }
+}
diff --git a/src/test/java/org/apache/sysds/performance/README.md
b/src/test/java/org/apache/sysds/performance/README.md
index 206bdedbc0..7e7edbb805 100644
--- a/src/test/java/org/apache/sysds/performance/README.md
+++ b/src/test/java/org/apache/sysds/performance/README.md
@@ -51,3 +51,22 @@ With profiler:
```bash
java -jar
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
target/systemds-3.2.0-SNAPSHOT-perf.jar 12 10000 100 4 1.0 16 1000 -1
```
+
+Take a Matrix and perform serialization
+
+```bash
+java -jar
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
target/systemds-3.2.0-SNAPSHOT-perf.jar 13 16 100 "temp/test.csv" -1
+```
+
+Take a Frame and transform into a Matrix and perform serialization.
+
+```bash
+java -jar
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
target/systemds-3.2.0-SNAPSHOT-perf.jar 14 16 1000
"src/test/resources/datasets/titanic/titanic.csv"
"src/test/resources/datasets/titanic/tfspec.json" -1
+```
+
+Frame Operation timings
+
+```bash
+java -jar
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
target/systemds-3.2.0-SNAPSHOT-perf.jar 15 16 10
"src/test/resources/datasets/titanic/titanic.csv"
"src/test/resources/datasets/titanic/tfspec.json"
+```
+
diff --git a/src/test/java/org/apache/sysds/performance/TimingUtils.java
b/src/test/java/org/apache/sysds/performance/TimingUtils.java
index 9a02854a5d..11e2c1dca5 100644
--- a/src/test/java/org/apache/sysds/performance/TimingUtils.java
+++ b/src/test/java/org/apache/sysds/performance/TimingUtils.java
@@ -80,15 +80,17 @@ public interface TimingUtils {
* it in the timing of the operation
*
* @param f The function to time
- * @param c A cleanup funtion or part that should not be timed.
+ * @param c A cleanup function or part that should not be timed.
+ * @param b A setup function that should not be timed.
* @param rep The number of repetitions to make
* @param bq The generator for the input
* @return A list of the individual repetitions execution time
* @throws InterruptedException An exception in case the job gets
interrupted
*/
- public static double[] time(F f, F c, int rep, IGenerate<?> bq) throws
InterruptedException {
+ public static double[] time(F f, F c, F b, int rep, IGenerate<?> bq)
throws InterruptedException {
double[] times = new double[rep];
for(int i = 0; i < rep; i++) {
+ b.run();
while(bq.isEmpty())
Thread.sleep(bq.defaultWaitTime());
time(f, times, i);
diff --git
a/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
index 5533205f2d..74114bf84e 100644
--- a/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
+++ b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
@@ -43,16 +43,20 @@ public abstract class APerfTest<T, G> {
}
protected void execute(F f, String name) throws InterruptedException {
- execute(f, () -> {
- return;
- }, name);
+ N n = new N();
+ execute(f, n, n, name);
}
protected void execute(F f, F c, String name) throws
InterruptedException {
+ N n = new N();
+ execute(f, c, n, name);
+ }
+
+ protected void execute(F f, F c, F b, String name) throws
InterruptedException {
warmup(f, 10);
gen.generate(N);
ret.clear();
- double[] times = TimingUtils.time(f, c, N, gen);
+ double[] times = TimingUtils.time(f, c, b, N, gen);
String retS = makeResString(times);
System.out.println(String.format("%35s, %s, %10s", name,
TimingUtils.stats(times), retS));
}
@@ -63,15 +67,18 @@ public abstract class APerfTest<T, G> {
}
protected void execute(F f, String name, int N) throws
InterruptedException {
- execute(f, () -> {
- return;
- }, name, N);
+ N none = new N();
+ execute(f, none, none, name, N);
}
protected void execute(F f, F c, String name, int N) throws
InterruptedException {
+ execute(f, c, new N(), name, N);
+ }
+
+ protected void execute(F f, F c, F b, String name, int N) throws
InterruptedException {
gen.generate(N);
ret.clear();
- double[] times = TimingUtils.time(f, c, N, gen);
+ double[] times = TimingUtils.time(f, c, b, N, gen);
String retS = makeResString(times);
System.out.println(String.format("%35s, %s, %10s", name,
TimingUtils.stats(times), retS));
}
@@ -86,8 +93,19 @@ public abstract class APerfTest<T, G> {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(String.format("%20s ",
this.getClass().getSimpleName()));
- sb.append(" Repetitions: ").append(N).append(" ");
+ sb.append(" Repetitions: ").append(N).append("\n");
+ sb.append(String.format("%20s ","Generator:"));
sb.append(gen);
+ sb.append("\n");
return sb.toString();
}
+
+ private class N implements F {
+
+ @Override
+ public void run() {
+ // co nothing
+ }
+
+ }
}
diff --git
a/src/test/java/org/apache/sysds/performance/compression/Serialize.java
b/src/test/java/org/apache/sysds/performance/compression/Serialize.java
index 149b1811f2..12316874c1 100644
--- a/src/test/java/org/apache/sysds/performance/compression/Serialize.java
+++ b/src/test/java/org/apache/sysds/performance/compression/Serialize.java
@@ -27,6 +27,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
+import java.util.ArrayList;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
@@ -65,42 +66,42 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
super(N, gen);
this.file = file;
this.k = k;
-
+
}
public void run() throws Exception, InterruptedException {
CompressedMatrixBlock.debug = true;
+ CompressedMatrixBlock.debug = false;
System.out.println(this);
File directory = new File(file).getParentFile();
if(!directory.exists()) {
directory.mkdir();
}
- if(k == 1){
+
+ if(k == 1) {
ConfigurationManager.getCompilerConfig().set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS,
false);
}
warmup(() -> sumTask(k), N);
- cleanup();
- execute(() -> writeUncompressed(k), "Serialize");
+
+ // execute(() -> writeUncompressed(k), "Serialize");
// execute(() -> diskUncompressed(k), "CustomDisk");
- cleanup();
- execute(() -> standardIO(k), () -> setFileSize(),
"StandardDisk");
- cleanup();
- execute(() -> compressTask(k), "Compress Normal");
+ execute(() -> standardIO(k), () -> setFileSize(), () ->
cleanup(), "StandardDisk");
+
+ // execute(() -> compressTask(k), "Compress Normal");
// execute(() -> writeCompressTask(k), "Compress Normal
Serialize");
// execute(() -> diskCompressTask(k), "Compress Normal
CustomDisk");
- cleanup();
- execute(() -> standardCompressedIO(k), () -> setFileSize(),
"Compress StandardIO");
- cleanup();
+
+ execute(() -> standardCompressedIO(k), () -> setFileSize(), ()
-> cleanup(), "Compress StandardIO");
final CompressionScheme sch2 = CLALibScheme.getScheme(getC());
- execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply
Scheme Fused");
+ // execute(() -> updateAndApplySchemeFused(sch2, k),
"Update&Apply Scheme Fused");
// execute(() -> writeUpdateAndApplySchemeFused(sch2, k),
"Update&Apply Scheme Fused Serialize");
- // cleanup();
// execute(() -> diskUpdateAndApplySchemeFused(sch2, k),
"Update&Apply Scheme Fused Disk");
- cleanup();
- execute(() -> standardCompressedIOUpdateAndApply(sch2, k), ()
-> setFileSize(), "Update&Apply Standard IO");
+
+ execute(() -> standardCompressedIOUpdateAndApply(sch2, k), ()
-> setFileSize(), () -> cleanup(),
+ "Update&Apply Standard IO");
}
public void run(int i) throws Exception, InterruptedException {
@@ -110,7 +111,7 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
ConfigurationManager.getCompilerConfig().set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS,
false);
}
- final CompressionScheme sch = CLALibScheme.getScheme(getC());
+ final CompressionScheme sch = (i == 8 || i == 9 || i == 10 || i
== 11) ? CLALibScheme.getScheme(getC()) : null;
cleanup();
switch(i) {
case 1:
@@ -120,7 +121,7 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
execute(() -> diskUncompressed(k),
"CustomDisk");
break;
case 3:
- execute(() -> standardIO(k), () ->
setFileSize(), "StandardDisk");
+ execute(() -> standardIO(k), () ->
setFileSize(), () -> cleanup(), "StandardDisk");
break;
case 4:
execute(() -> compressTask(k), "Compress
Normal");
@@ -132,7 +133,7 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
execute(() -> diskCompressTask(k), "Compress
Normal CustomDisk");
break;
case 7:
- execute(() -> standardCompressedIO(k), () ->
setFileSize(), "Compress StandardIO");
+ execute(() -> standardCompressedIO(k), () ->
setFileSize(), () -> cleanup(), "Compress StandardIO");
break;
case 8:
execute(() -> updateAndApplySchemeFused(sch,
k), "Update&Apply Scheme Fused");
@@ -144,7 +145,7 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
execute(() ->
diskUpdateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused Disk");
break;
case 11:
- execute(() ->
standardCompressedIOUpdateAndApply(sch, k), () -> setFileSize(),
+ execute(() ->
standardCompressedIOUpdateAndApply(sch, k), () -> setFileSize(), () ->
cleanup(),
"Update&Apply Standard IO");
break;
}
@@ -296,6 +297,10 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
@Override
protected String makeResString(double[] times) {
+ return makeResString(ret, times);
+ }
+
+ public static String makeResString(ArrayList<InOut> ret, double[]
times) {
double totalIn = 0;
double totalOut = 0;
double totalTime = 0.0;
@@ -334,6 +339,7 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
double stdOut = Math.sqrt(varOut / el);
return String.format("%12.0f+-%12.0f Byte/s, %12.0f+-%12.0f
Byte/s", bytePerMsIn, stdIn, bytePerMsOut, stdOut);
+
}
public static int compare(InOut a, InOut b) {
@@ -473,7 +479,7 @@ public class Serialize extends APerfTest<Serialize.InOut,
MatrixBlock> {
return super.toString() + " threads: " + k;
}
- protected class InOut {
+ protected static class InOut {
protected long in;
protected long out;
protected double time;
diff --git
a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
b/src/test/java/org/apache/sysds/performance/generators/Const.java
similarity index 52%
copy from src/test/java/org/apache/sysds/performance/generators/IGenerate.java
copy to src/test/java/org/apache/sysds/performance/generators/Const.java
index ee39590bf3..2d3adc1ace 100644
--- a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
+++ b/src/test/java/org/apache/sysds/performance/generators/Const.java
@@ -19,38 +19,6 @@
package org.apache.sysds.performance.generators;
-/**
- * Generator interface for task generation.
- */
-public interface IGenerate<T> {
-
- /**
- * Validate if the generator is empty, and we have to wait for elements.
- *
- * @return If the generator is empty
- */
- public boolean isEmpty();
-
- /**
- * Default wait time for the generator to fill
- *
- * @return The wait time
- */
- public int defaultWaitTime();
-
- /**
- * A Blocking take operation that waits for the Generator to fill that
element
- *
- * @return An task element
- */
- public T take();
-
- /**
- * A Non blocking async operation that generates elements for the task
que
- *
- * @param N The number of elements to create
- * @throws InterruptedException An exception if the task is interrupted
- */
- public void generate(int N) throws InterruptedException;
-
+public interface Const<T> extends IGenerate<T> {
+ public void change(T t);
}
diff --git
a/src/test/java/org/apache/sysds/performance/generators/ConstFrame.java
b/src/test/java/org/apache/sysds/performance/generators/ConstFrame.java
new file mode 100644
index 0000000000..13f7392380
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/generators/ConstFrame.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.generators;
+
+import java.util.Arrays;
+
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+
+public class ConstFrame implements Const<FrameBlock> {
+
+ protected FrameBlock fb;
+
+ public ConstFrame(FrameBlock fb) {
+ this.fb = fb;
+ }
+
+ @Override
+ public FrameBlock take() {
+ return fb;
+ }
+
+ @Override
+ public void generate(int N) throws InterruptedException {
+ // do nothing
+ }
+
+ @Override
+ public final boolean isEmpty() {
+ return false;
+ }
+
+ @Override
+ public final int defaultWaitTime() {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClass().getSimpleName());
+ sb.append(" Schema:");
+ sb.append(Arrays.toString(fb.getSchema()));
+ return sb.toString();
+ }
+
+ @Override
+ public void change(FrameBlock t) {
+ fb = t;
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
index f01d0a2075..f43e48caa7 100644
--- a/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
+++ b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
@@ -27,9 +27,9 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
import org.apache.sysds.test.TestUtils;
-public class ConstMatrix implements IGenerate<MatrixBlock> {
+public class ConstMatrix implements Const<MatrixBlock> {
- protected final MatrixBlock mb;
+ protected MatrixBlock mb;
protected final int nVal;
public ConstMatrix(MatrixBlock mb) {
@@ -55,6 +55,16 @@ public class ConstMatrix implements IGenerate<MatrixBlock> {
// do nothing
}
+ @Override
+ public final boolean isEmpty() {
+ return false;
+ }
+
+ @Override
+ public final int defaultWaitTime() {
+ return 0;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -68,13 +78,7 @@ public class ConstMatrix implements IGenerate<MatrixBlock> {
}
@Override
- public boolean isEmpty() {
- return false;
- }
-
- @Override
- public int defaultWaitTime() {
- return 0;
+ public void change(MatrixBlock t) {
+ mb = t;
}
-
}
diff --git
a/src/test/java/org/apache/sysds/performance/generators/FrameFile.java
b/src/test/java/org/apache/sysds/performance/generators/FrameFile.java
new file mode 100644
index 0000000000..d89a2589d7
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/generators/FrameFile.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.generators;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderFactory;
+import org.apache.sysds.runtime.meta.MetaDataAll;
+
+public class FrameFile extends ConstFrame {
+
+ final private String path;
+
+ private FrameFile(String path, FrameBlock fb) {
+ super(fb);
+ this.path = path;
+ System.out.println("First 10 rows:");
+ System.out.println(fb.slice(0, 10));
+ }
+
+ public static FrameFile create(String path) throws Exception {
+
+ MetaDataAll mba = new MetaDataAll(path + ".mtd", false, true);
+ if(mba.mtdExists()) {
+ LOG.error(mba);
+
+ // DataCharacteristics ds = mba.getDataCharacteristics();
+ FileFormat f =
FileFormat.valueOf(mba.getFormatTypeString().toUpperCase());
+ ValueType[] schema = FrameObject.parseSchema(mba.getSchema());
+ FileFormatProperties p = null;
+ if(f.equals(FileFormat.CSV)){
+ p = new FileFormatPropertiesCSV();
+ ((FileFormatPropertiesCSV)p).setHeader(mba.getHasHeader());
+ }
+ FrameReader r = FrameReaderFactory.createFrameReader(f, p);
+ FrameBlock fb = r.readFrameFromHDFS(path, schema, mba.getDim1(),
mba.getDim2());
+ return new FrameFile(path, fb);
+ }
+ else {
+ LOG.error("No Mtd file found.. please add one. Fallback to CSV
reading with header");
+ // we assume csv
+ FrameReader r =
FrameReaderFactory.createFrameReader(FileFormat.CSV);
+ FrameBlock fb = r.readFrameFromHDFS(path, -1, -1);
+ return new FrameFile(path, fb);
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(super.toString());
+ sb.append(" From file: ");
+ sb.append(path);
+ return sb.toString();
+ }
+
+}
diff --git
a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
index ee39590bf3..7da382b7b8 100644
--- a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
+++ b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
@@ -19,11 +19,16 @@
package org.apache.sysds.performance.generators;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/**
* Generator interface for task generation.
*/
public interface IGenerate<T> {
+ public static final Log LOG =
LogFactory.getLog(IGenerate.class.getName());
+
/**
* Validate if the generator is empty, and we have to wait for elements.
*
diff --git
a/src/test/java/org/apache/sysds/performance/generators/MatrixFile.java
b/src/test/java/org/apache/sysds/performance/generators/MatrixFile.java
new file mode 100644
index 0000000000..0f85528ad2
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/generators/MatrixFile.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.generators;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.runtime.io.MatrixReader;
+import org.apache.sysds.runtime.io.MatrixReaderFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MetaDataAll;
+
+public class MatrixFile extends ConstMatrix {
+
+ final private String path;
+
+ private MatrixFile(String path, MatrixBlock mb) {
+ super(mb);
+ this.path = path;
+ }
+
+ public static MatrixFile create(String path) throws Exception {
+
+ MetaDataAll mba = new MetaDataAll(path + ".mtd", false, true);
+ DataCharacteristics ds = mba.getDataCharacteristics();
+ FileFormat f =
FileFormat.valueOf(mba.getFormatTypeString().toUpperCase());
+
+ MatrixReader r = MatrixReaderFactory.createMatrixReader(f);
+ MatrixBlock mb = r.readMatrixFromHDFS(path, ds.getRows(),
ds.getCols(), ds.getBlocksize(), ds.getNonZeros());
+ return new MatrixFile(path, mb);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClass().getSimpleName());
+ sb.append(" From file: ");
+ sb.append(path);
+ return sb.toString();
+ }
+
+}