This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new 5e726cf [SYSTEMDS-55] Fix file format handling, docs, and github test
config
5e726cf is described below
commit 5e726cfe3122336becde8c84800180dd211d935a
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat May 9 21:49:01 2020 +0200
[SYSTEMDS-55] Fix file format handling, docs, and github test config
Closes #907.
---
.github/workflows/documentation.yml | 5 +----
.../java/org/apache/sysds/parser/DataExpression.java | 10 ++++------
.../controlprogram/caching/CacheableData.java | 2 +-
.../context/SparkExecutionContext.java | 2 +-
.../instructions/cp/VariableCPInstruction.java | 20 +++++++++-----------
.../instructions/spark/ReblockSPInstruction.java | 13 +++++++++++--
.../instructions/spark/WriteSPInstruction.java | 1 -
.../functions/ExtractBlockForBinaryReblock.java | 2 +-
.../apache/sysds/runtime/io/FrameReaderFactory.java | 2 ++
.../test/functions/io/csv/FormatChangeTest.java | 3 +--
10 files changed, 31 insertions(+), 29 deletions(-)
diff --git a/.github/workflows/documentation.yml
b/.github/workflows/documentation.yml
index 201210f..81e5abd 100644
--- a/.github/workflows/documentation.yml
+++ b/.github/workflows/documentation.yml
@@ -21,10 +21,7 @@
name: Documentation
-on:
- push:
- branches:
- - master
+on: [push, pull_request]
jobs:
documentation1:
diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java
b/src/main/java/org/apache/sysds/parser/DataExpression.java
index 245af7e..c94532d 100644
--- a/src/main/java/org/apache/sysds/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysds/parser/DataExpression.java
@@ -1245,7 +1245,7 @@ public class DataExpression extends DataIdentifier
//validate read filename
if (getVarParam(FORMAT_TYPE) == null ||
FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()))
getOutput().setBlocksize(-1);
- else if
(getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase("binary"))
+ else if
(getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.BINARY.toString()))
getOutput().setBlocksize(ConfigurationManager.getBlocksize());
else
raiseValidateError("Invalid format " +
getVarParam(FORMAT_TYPE)
@@ -2059,11 +2059,9 @@ public class DataExpression extends DataIdentifier
// if the read method parameter is a constant, then
verify value matches MTD metadata file
if (getVarParam(key.toString()) != null &&
(getVarParam(key.toString()) instanceof ConstIdentifier)
&&
!getVarParam(key.toString()).toString().equalsIgnoreCase(val.toString())) {
- raiseValidateError(
- "Parameter '" + key.toString()
- + "' has
conflicting values in metadata and read statement. MTD file value: '"
- +
val.toString() + "'. Read statement value: '" + getVarParam(key.toString()) +
"'.",
- conditional);
+ raiseValidateError("Parameter '" +
key.toString()
+ + "' has conflicting values in metadata
and read statement. MTD file value: '"
+ + val.toString() + "'. Read statement
value: '" + getVarParam(key.toString()) + "'.", conditional);
} else {
// if the read method does not specify
parameter value, then add MTD metadata file value to parameter list
if (getVarParam(key.toString()) == null){
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 8d750a4..79fefc5 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -955,7 +955,7 @@ public abstract class CacheableData<T extends CacheBlock>
extends Data
throw new DMLRuntimeException("Unexpected error while
writing mtd file (" + filePathAndName + ") -- metadata is null.");
// Write the matrix to HDFS in requested format
- FileFormat fmt = iimd.getFileFormat();
+ FileFormat fmt = (outputFormat != null) ?
FileFormat.safeValueOf(outputFormat) : iimd.getFileFormat();
if ( fmt != FileFormat.MM ) {
// Get the dimension information from the metadata
stored within MatrixObject
DataCharacteristics dc = iimd.getDataCharacteristics();
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 17d9899..a1e2b92 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -517,7 +517,7 @@ public class SparkExecutionContext extends ExecutionContext
* in order to support the old transform implementation.
*
* @param fo frame object
- * @param inputInfo input info
+ * @param fmt file format type
* @return JavaPairRDD handle for a frame object
*/
@SuppressWarnings({ "unchecked", "resource" })
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 053d7ea..b550a8a 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -879,8 +879,9 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
private void processWriteInstruction(ExecutionContext ec) {
//get filename (literal or variable expression)
String fname = ec.getScalarInput(getInput2().getName(),
ValueType.STRING, getInput2().isLiteral()).getStringValue();
- if (!getInput3().getName().equalsIgnoreCase("libsvm"))
- {
+ String fmtStr = getInput3().getName();
+ FileFormat fmt = FileFormat.safeValueOf(fmtStr);
+ if( fmt != FileFormat.LIBSVM ) {
String desc = ec.getScalarInput(getInput4().getName(),
ValueType.STRING, getInput4().isLiteral()).getStringValue();
_formatProperties.setDescription(desc);
}
@@ -889,28 +890,25 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
writeScalarToHDFS(ec, fname);
}
else if( getInput1().getDataType() == DataType.MATRIX ) {
- String outFmt = getInput3().getName();
- if (outFmt.equalsIgnoreCase("matrixmarket"))
+ if( fmt == FileFormat.MM )
writeMMFile(ec, fname);
- else if (outFmt.equalsIgnoreCase("csv") )
+ else if( fmt == FileFormat.CSV )
writeCSVFile(ec, fname);
else {
// Default behavior
MatrixObject mo =
ec.getMatrixObject(getInput1().getName());
mo.setPrivacyConstraints(getPrivacyConstraint());
- mo.exportData(fname, outFmt, _formatProperties);
+ mo.exportData(fname, fmtStr, _formatProperties);
}
}
else if( getInput1().getDataType() == DataType.FRAME ) {
- String outFmt = getInput3().getName();
FrameObject mo =
ec.getFrameObject(getInput1().getName());
- mo.exportData(fname, outFmt, _formatProperties);
+ mo.exportData(fname, fmtStr, _formatProperties);
}
else if( getInput1().getDataType() == DataType.TENSOR ) {
// TODO write tensor
- String outFmt = getInput3().getName();
TensorObject to =
ec.getTensorObject(getInput1().getName());
- to.exportData(fname, outFmt, _formatProperties);
+ to.exportData(fname, fmtStr, _formatProperties);
}
}
@@ -973,7 +971,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
*/
private void writeMMFile(ExecutionContext ec, String fname) {
MatrixObject mo = ec.getMatrixObject(getInput1().getName());
- String outFmt = "matrixmarket";
+ String outFmt = FileFormat.MM.toString();
if(mo.isDirty()) {
// there exist data computed in CP that is not backed
up on HDFS
// i.e., it is either in-memory or in evicted space
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 33e7e18..99602ae 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -42,6 +42,7 @@ import org.apache.sysds.runtime.io.FileFormatPropertiesMM;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixCell;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -154,8 +155,16 @@ public class ReblockSPInstruction extends
UnarySPInstruction {
csvInstruction.processInstruction(sec);
return;
}
- else if(fmt == FileFormat.BINARY)
- {
+ else if(fmt == FileFormat.BINARY && mc.getBlocksize() <= 0) {
+ //BINARY BLOCK <- BINARY CELL (e.g., after grouped
aggregate)
+ JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells =
(JavaPairRDD<MatrixIndexes, MatrixCell>) sec.getRDDHandleForMatrixObject(mo,
FileFormat.BINARY);
+ JavaPairRDD<MatrixIndexes, MatrixBlock> out =
RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), binaryCells,
mcOut, outputEmptyBlocks);
+
+ //put output RDD handle into symbol table
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ }
+ else if(fmt == FileFormat.BINARY) {
//BINARY BLOCK <- BINARY BLOCK (different sizes)
JavaPairRDD<MatrixIndexes, MatrixBlock> in1 =
sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName());
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index 6abd7d8..9257213 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -81,7 +81,6 @@ public class WriteSPInstruction extends SPInstruction
implements LineageTraceabl
throw new DMLRuntimeException("Invalid number of
operands in write instruction: " + str);
}
-
//SPARK°write°_mVar2·MATRIX·DOUBLE°./src/test/scripts/functions/data/out/B·SCALAR·STRING·true°matrixmarket·SCALAR·STRING·true
// _mVar2·MATRIX·DOUBLE
CPOperand in1 = new CPOperand(parts[1]);
CPOperand in2 = new CPOperand(parts[2]);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
index 339afb3..928a293 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
@@ -46,7 +46,7 @@ public class ExtractBlockForBinaryReblock implements
PairFlatMapFunction<Tuple2<
//sanity check block sizes
if(in_blen <= 0 || out_blen <= 0)
- throw new DMLRuntimeException("Block sizes not
unknown:" + in_blen + "," + out_blen);
+ throw new DMLRuntimeException("Block sizes unknown:" +
in_blen + ", " + out_blen);
}
@Override
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java
index e73e607..4510133 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java
@@ -57,6 +57,8 @@ public class FrameReaderFactory
reader = new
FrameReaderBinaryBlockParallel();
else
reader = new FrameReaderBinaryBlock();
+ break;
+
default:
throw new DMLRuntimeException(
"Failed to create frame reader for
unknown format: " + fmt.toString());
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/csv/FormatChangeTest.java
b/src/test/java/org/apache/sysds/test/functions/io/csv/FormatChangeTest.java
index 84162fa..985baa8 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/csv/FormatChangeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/csv/FormatChangeTest.java
@@ -37,7 +37,6 @@ import org.apache.sysds.test.TestUtils;
@net.jcip.annotations.NotThreadSafe
public class FormatChangeTest extends AutomatedTestBase
{
-
private final static String TEST_NAME = "csv_test";
private final static String TEST_DIR = "functions/io/csv/";
private final static String TEST_CLASS_DIR = TEST_DIR +
FormatChangeTest.class.getSimpleName() + "/";
@@ -144,7 +143,7 @@ public class FormatChangeTest extends AutomatedTestBase
programArgs[3] = binFile;
programArgs[4] = "binary";
runTest(true, false, null, -1);
-
+
// Test TextCell -> CSV conversion
System.out.println("TextCell -> CSV");
programArgs[2] = "text";