This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new c606fee [SYSTEMDS-2987] Read/write of composite list objects (all
formats)
c606fee is described below
commit c606feedef1da26187e2722f936a43c7d6f2f448
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Jun 5 22:04:40 2021 +0200
[SYSTEMDS-2987] Read/write of composite list objects (all formats)
This patch adds the missing support for list read and write for regular
and named lists. Includes objects can be matrices, tensors, frames,
scalars and even recursively nested list objects. In detail, we write a
directory with proper mtd file, and then use the existing
readers/writers for storing the individual objects. This has the
advantage that we naturally (and consistently) support all formats,
single- and multi-threaded read/write, and the separate files (all with
their mtd files) can be even processed individually.
---
src/main/java/org/apache/sysds/common/Types.java | 1 +
src/main/java/org/apache/sysds/lops/Data.java | 2 +-
.../java/org/apache/sysds/lops/compile/Dag.java | 3 +-
.../org/apache/sysds/parser/DataExpression.java | 22 ++--
.../java/org/apache/sysds/parser/Expression.java | 4 +
.../sysds/runtime/instructions/cp/ListObject.java | 8 ++
.../instructions/cp/VariableCPInstruction.java | 43 +++----
.../apache/sysds/runtime/io/IOUtilFunctions.java | 18 +++
.../org/apache/sysds/runtime/io/ListReader.java | 131 +++++++++++++++++++++
.../org/apache/sysds/runtime/io/ListWriter.java | 85 +++++++++++++
.../org/apache/sysds/runtime/meta/MetaDataAll.java | 5 +
.../org/apache/sysds/runtime/util/HDFSTool.java | 15 ++-
.../sysds/test/functions/io/FullDynWriteTest.java | 2 +-
.../sysds/test/functions/io/ReadWriteListTest.java | 128 ++++++++++++++++++++
src/test/scripts/functions/io/ListRead.dml | 41 +++++++
src/test/scripts/functions/io/ListWrite.dml | 34 ++++++
16 files changed, 504 insertions(+), 38 deletions(-)
diff --git a/src/main/java/org/apache/sysds/common/Types.java
b/src/main/java/org/apache/sysds/common/Types.java
index 5f077f6..77e79f6 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -109,6 +109,7 @@ public class Types
case "INT": return INT64;
case "BOOLEAN": return BOOLEAN;
case "STRING": return STRING;
+ case "UNKNOWN": return UNKNOWN;
default:
throw new DMLRuntimeException("Unknown
value type: "+value);
}
diff --git a/src/main/java/org/apache/sysds/lops/Data.java
b/src/main/java/org/apache/sysds/lops/Data.java
index f40af4e..2bd302d 100644
--- a/src/main/java/org/apache/sysds/lops/Data.java
+++ b/src/main/java/org/apache/sysds/lops/Data.java
@@ -382,7 +382,7 @@ public class Data extends Lop
}
public String getCreateVarInstructions(String outputFileName, String
outputLabel) {
- if ( getDataType() == DataType.MATRIX || getDataType() ==
DataType.FRAME ) {
+ if ( getDataType() == DataType.MATRIX || getDataType() ==
DataType.FRAME || getDataType() == DataType.LIST ) {
if ( _op.isTransient() )
throw new LopsException("getInstructions()
should not be called for transient nodes.");
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java
b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index cfe409e..469c600 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -343,7 +343,8 @@ public class Dag<N extends Lop>
if (n.isDataExecLocation()
&& !((Data) n).getOperationType().isTransient()
&& ((Data) n).getOperationType().isRead()
- && (n.getDataType() == DataType.MATRIX ||
n.getDataType() == DataType.FRAME) )
+ && (n.getDataType() == DataType.MATRIX ||
n.getDataType() == DataType.FRAME
+ || n.getDataType() == DataType.LIST) )
{
if ( !((Data)n).isLiteral() ) {
try {
diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java
b/src/main/java/org/apache/sysds/parser/DataExpression.java
index 2bc1376..0f4352a 100644
--- a/src/main/java/org/apache/sysds/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysds/parser/DataExpression.java
@@ -1298,8 +1298,14 @@ public class DataExpression extends DataIdentifier
getOutput().setNnz(-1L);
setPrivacy();
}
+ else if (
dataTypeString.equalsIgnoreCase(DataType.LIST.name())) {
+ getOutput().setDataType(DataType.LIST);
+ setPrivacy();
+ }
else{
- raiseValidateError("Unknown Data Type " +
dataTypeString + ". Valid values: " + Statement.SCALAR_DATA_TYPE +", " +
Statement.MATRIX_DATA_TYPE, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+ raiseValidateError("Unknown Data Type " +
dataTypeString + ". Valid values: "
+ + Statement.SCALAR_DATA_TYPE +", " +
Statement.MATRIX_DATA_TYPE+", " + Statement.FRAME_DATA_TYPE
+ +", " +
DataType.LIST.name().toLowerCase(), conditional,
LanguageErrorCodes.INVALID_PARAMETERS);
}
// handle value type parameter
@@ -1310,17 +1316,19 @@ public class DataExpression extends DataIdentifier
// Identify the value type (used only for read method)
String valueTypeString = getVarParam(VALUETYPEPARAM) ==
null ? null : getVarParam(VALUETYPEPARAM).toString();
if (valueTypeString != null) {
- if
(valueTypeString.equalsIgnoreCase(Statement.DOUBLE_VALUE_TYPE)) {
+ if
(valueTypeString.equalsIgnoreCase(Statement.DOUBLE_VALUE_TYPE))
getOutput().setValueType(ValueType.FP64);
- } else if
(valueTypeString.equalsIgnoreCase(Statement.STRING_VALUE_TYPE)) {
+ else if
(valueTypeString.equalsIgnoreCase(Statement.STRING_VALUE_TYPE))
getOutput().setValueType(ValueType.STRING);
- } else if
(valueTypeString.equalsIgnoreCase(Statement.INT_VALUE_TYPE)) {
+ else if
(valueTypeString.equalsIgnoreCase(Statement.INT_VALUE_TYPE))
getOutput().setValueType(ValueType.INT64);
- } else if
(valueTypeString.equalsIgnoreCase(Statement.BOOLEAN_VALUE_TYPE)) {
+ else if
(valueTypeString.equalsIgnoreCase(Statement.BOOLEAN_VALUE_TYPE))
getOutput().setValueType(ValueType.BOOLEAN);
- } else {
+ else if
(valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name()))
+
getOutput().setValueType(ValueType.UNKNOWN);
+ else {
raiseValidateError("Unknown Value Type
" + valueTypeString
- + ". Valid values are:
" + Statement.DOUBLE_VALUE_TYPE +", " + Statement.INT_VALUE_TYPE + ", " +
Statement.BOOLEAN_VALUE_TYPE + ", " + Statement.STRING_VALUE_TYPE, conditional);
+ + ". Valid values are: " +
Statement.DOUBLE_VALUE_TYPE +", " + Statement.INT_VALUE_TYPE + ", " +
Statement.BOOLEAN_VALUE_TYPE + ", " + Statement.STRING_VALUE_TYPE, conditional);
}
} else {
getOutput().setValueType(ValueType.FP64);
diff --git a/src/main/java/org/apache/sysds/parser/Expression.java
b/src/main/java/org/apache/sysds/parser/Expression.java
index e7b49ca..d8f0580 100644
--- a/src/main/java/org/apache/sysds/parser/Expression.java
+++ b/src/main/java/org/apache/sysds/parser/Expression.java
@@ -302,6 +302,10 @@ public abstract class Expression implements ParseInfo
public static ValueType computeValueType(Expression expr1, ValueType
v1, ValueType v2, boolean cast) {
if (v1 == v2)
return v1;
+ if (v1 == ValueType.UNKNOWN && v2 != ValueType.UNKNOWN)
+ return v2;
+ if (v1 != ValueType.UNKNOWN && v2 == ValueType.UNKNOWN)
+ return v1;
if (cast) {
if (v1 == ValueType.FP64 && v2 == ValueType.INT64)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
index 392e9cf..edfd6cc 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
@@ -61,10 +61,18 @@ public class ListObject extends Data implements
Externalizable {
public ListObject(List<Data> data) {
this(data, null, null);
}
+
+ public ListObject(Data[] data) {
+ this(Arrays.asList(data), null, null);
+ }
public ListObject(List<Data> data, List<String> names) {
this(data, names, null);
}
+
+ public ListObject(Data[] data, String[] names) {
+ this(Arrays.asList(data), Arrays.asList(names), null);
+ }
public ListObject(List<Data> data, List<String> names,
List<LineageItem> lineage) {
super(DataType.LIST, ValueType.UNKNOWN);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 3457dd4..a806741 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -52,6 +52,8 @@ import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM;
import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.io.ListReader;
+import org.apache.sysds.runtime.io.ListWriter;
import org.apache.sysds.runtime.io.WriterMatrixMarket;
import org.apache.sysds.runtime.io.WriterTextCSV;
import org.apache.sysds.runtime.lineage.LineageItem;
@@ -371,7 +373,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
}
MetaDataFormat iimd = null;
- if (dt == DataType.MATRIX || dt == DataType.FRAME) {
+ if (dt == DataType.MATRIX || dt == DataType.FRAME || dt
== DataType.LIST) {
DataCharacteristics mc = new
MatrixCharacteristics();
if (parts.length == 6) {
// do nothing
@@ -660,6 +662,12 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
ec.setVariable(getInput1().getName(), fobj);
break;
}
+ case LIST: {
+ ListObject lo =
ListReader.readListFromHDFS(getInput2().getName(),
+
((MetaDataFormat)metadata).getFileFormat().name(), _formatProperties);
+ ec.setVariable(getInput1().getName(), lo);
+ break;
+ }
case SCALAR: {
//created variable not called for scalars
ec.setScalarOutput(getInput1().getName(), null);
@@ -912,29 +920,8 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
* @param ec execution context
*/
private void processReadInstruction(ExecutionContext ec){
- ScalarObject res = null;
- try {
- switch(getInput1().getValueType()) {
- case FP64:
- res = new
DoubleObject(HDFSTool.readDoubleFromHDFSFile(getInput2().getName()));
- break;
- case INT64:
- res = new
IntObject(HDFSTool.readIntegerFromHDFSFile(getInput2().getName()));
- break;
- case BOOLEAN:
- res = new
BooleanObject(HDFSTool.readBooleanFromHDFSFile(getInput2().getName()));
- break;
- case STRING:
- res = new
StringObject(HDFSTool.readStringFromHDFSFile(getInput2().getName()));
- break;
- default:
- throw new
DMLRuntimeException("Invalid value type ("
- +
getInput1().getValueType() + ") while processing readScalar instruction.");
- }
- } catch ( IOException e ) {
- throw new DMLRuntimeException(e);
- }
- ec.setScalarOutput(getInput1().getName(), res);
+ ec.setScalarOutput(getInput1().getName(),
+
HDFSTool.readScalarObjectFromHDFSFile(getInput2().getName(),
getInput1().getValueType()));
}
/**
@@ -1012,6 +999,10 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
setPrivacyConstraint(to.getPrivacyConstraint());
to.exportData(fname, fmtStr, _formatProperties);
}
+ else if( getInput1().getDataType() == DataType.LIST ) {
+ ListObject lo = ec.getListObject(getInput1().getName());
+ ListWriter.writeListToHDFS(lo, fname, fmtStr,
_formatProperties);
+ }
}
/**
@@ -1162,8 +1153,8 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
Path path = new Path(fname);
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
}
-
- } catch ( IOException e ) {
+ }
+ catch ( IOException e ) {
throw new DMLRuntimeException(e);
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index de74e81..df71dea 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -559,6 +559,24 @@ public class IOUtilFunctions
return ret;
}
+ public static Path[] getMetadataFilePaths( FileSystem fs, Path file )
+ throws IOException
+ {
+ Path[] ret = null;
+ if( fs.isDirectory(file) ||
IOUtilFunctions.isObjectStoreFileScheme(file) ) {
+ LinkedList<Path> tmp = new LinkedList<>();
+ FileStatus[] dStatus = fs.listStatus(file);
+ for( FileStatus fdStatus : dStatus )
+ if(
fdStatus.getPath().toString().endsWith(".mtd") ) //mtd file
+ tmp.add(fdStatus.getPath());
+ ret = tmp.toArray(new Path[0]);
+ }
+ else {
+ throw new DMLRuntimeException("Unable to read meta data
files from directory "+file.toString());
+ }
+ return ret;
+ }
+
/**
* Delete the CRC files from the local file system associated with a
* particular file and its metadata file.
diff --git a/src/main/java/org/apache/sysds/runtime/io/ListReader.java
b/src/main/java/org/apache/sysds/runtime/io/ListReader.java
new file mode 100644
index 0000000..1569658
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/ListReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MetaDataAll;
+import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+public class ListReader
+{
+ /**
+ * Reads a list object and all contained objects from a folder with
related meta data.
+ * The individual objects (including nested lists) are read with
existing matrix/frame
+ * readers and meta data such that the entire list and separate objects
can be restored.
+ * By using the existing readers, all formats are naturally supported
and we can ensure
+ * consistency of the on-disk representation.
+ *
+ * @param fname directory name
+ * @param fmtStr format string
+ * @param props file format properties
+ * @return list object
+ * @throws DMLRuntimeException
+ */
+ public static ListObject readListFromHDFS(String fname, String fmtStr,
FileFormatProperties props)
+ throws DMLRuntimeException
+ {
+ MetaDataAll meta = new MetaDataAll(fname+".mtd", false, true);
+ int numObjs = (int) meta.getDim1();
+ boolean named = false;
+
+ Data[] data = null;
+ String[] names = null;
+ try {
+ // read all meta data files
+ Path dirPath = new Path(fname);
+ JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
+ FileSystem fs = IOUtilFunctions.getFileSystem(dirPath,
job);
+ Path[] mtdFiles =
IOUtilFunctions.getMetadataFilePaths(fs, dirPath);
+ if( numObjs != mtdFiles.length ) {
+ throw new DMLRuntimeException("List meta data
does not match "
+ + "available mtd files: "+numObjs+" vs
"+mtdFiles.length);
+ }
+
+ // determine if regular or named list
+ named = Arrays.stream(mtdFiles).map(p -> p.toString())
+ .anyMatch(s ->
!s.substring(s.lastIndexOf('_')).equals("null"));
+ data = new Data[numObjs];
+ names = named ? new String[numObjs] : null;
+
+ // read all individual files (but only create objects
for
+ // matrices and frames, which are then read on demand
via acquire())
+ for( int i=0; i<numObjs; i++ ) {
+ MetaDataAll lmeta = new
MetaDataAll(mtdFiles[i].toString(), false, true);
+ String lfname =
lmeta.getFilename().substring(0, lmeta.getFilename().length()-4);
+ DataCharacteristics dc =
lmeta.getDataCharacteristics();
+ FileFormat fmt = lmeta.getFileFormat();
+ Data dat = null;
+ switch( lmeta.getDataType() ) {
+ case MATRIX:
+ dat = new
MatrixObject(lmeta.getValueType(), lfname);
+ break;
+ case TENSOR:
+ dat = new
TensorObject(lmeta.getValueType(), lfname);
+ break;
+ case FRAME:
+ dat = new FrameObject(lfname);
+ if( lmeta.getSchema() != null )
+
((FrameObject)dat).setSchema(lmeta.getSchema());
+ break;
+ case LIST:
+ dat =
ListReader.readListFromHDFS(lfname, fmt.toString(), props);
+ break;
+ case SCALAR:
+ dat =
HDFSTool.readScalarObjectFromHDFSFile(lfname, lmeta.getValueType());
+ break;
+ default:
+ throw new
DMLRuntimeException("Unexpected data type: " + lmeta.getDataType());
+ }
+
+ if(dat instanceof CacheableData<?>) {
+ ((CacheableData<?>)dat).setMetaData(new
MetaDataFormat(dc, fmt));
+
((CacheableData<?>)dat).enableCleanup(false); // disable delete
+ }
+
+ String[] parts =
lfname.substring(lfname.lastIndexOf("/")+1).split("_");
+ data[Integer.parseInt(parts[0])] = dat;
+ if( named )
+ names[Integer.parseInt(parts[0])] =
parts[1];
+ }
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(
+ "Failed to write list object of length
"+numObjs+".", ex);
+ }
+
+ // construct list object
+ return named ? new ListObject(data, names) : new
ListObject(data);
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/ListWriter.java
b/src/main/java/org/apache/sysds/runtime/io/ListWriter.java
new file mode 100644
index 0000000..d5b95d7
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/ListWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+public class ListWriter
+{
+ /**
+ * Writes a list object and all contained objects to a folder with
related meta data.
+ * The individual objects (including nested lists) are written with
existing matrix/frame
+ * writers and meta data such that the entire list and separate objects
can be read back.
+ * By using the existing writers, all formats are naturally supported
and we can ensure
+ * consistency of the on-disk representation.
+ *
+ * @param lo list object
+ * @param fname directory name
+ * @param fmtStr format string
+ * @param props file format properties
+ * @throws DMLRuntimeException
+ */
+ public static void writeListToHDFS(ListObject lo, String fname, String
fmtStr, FileFormatProperties props)
+ throws DMLRuntimeException
+ {
+ DataCharacteristics dc = new
MatrixCharacteristics(lo.getLength(), 1, 0, 0);
+
+ try {
+ //write basic list meta data
+ HDFSTool.writeMetaDataFile(fname + ".mtd",
lo.getValueType(), null,
+ lo.getDataType(), dc,
FileFormat.safeValueOf(fmtStr),
+ props, lo.getPrivacyConstraint());
+
+ //create folder for list w/ appropriate permissions
+ HDFSTool.createDirIfNotExistOnHDFS(fname,
+ DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+
+ //write regular/named list by position/position_name
+ //TODO additional parallelization over objects (in
addition to parallel writers)
+ for(int i=0; i<lo.getLength(); i++) {
+ Data dat = lo.getData(i);
+ String lfname = fname
+"/"+i+"_"+(lo.isNamedList()?lo.getName(i):"null");
+ if( dat instanceof CacheableData<?> )
+
((CacheableData<?>)dat).exportData(lfname, fmtStr, props);
+ else if( dat instanceof ListObject )
+ writeListToHDFS((ListObject)dat,
lfname, fmtStr, props);
+ else { //scalar
+ ScalarObject so = (ScalarObject) dat;
+
HDFSTool.writeObjectToHDFS(so.getValue(), lfname);
+ HDFSTool.writeScalarMetaDataFile(lfname
+".mtd",
+ so.getValueType(),
so.getPrivacyConstraint());
+ }
+ }
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(
+ "Failed to write list object of length
"+dc.getRows()+".", ex);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
index 62bd1f9..d312289 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
@@ -87,6 +87,7 @@ public class MetaDataAll extends DataIdentifier {
}
public MetaDataAll(String mtdFileName, boolean conditional, boolean
parseMeta) {
+ setFilename(mtdFileName);
_metaObj = readMetadataFile(mtdFileName, conditional);
setPrivacy(PrivacyConstraint.PrivacyLevel.None);
if(parseMeta)
@@ -235,6 +236,10 @@ public class MetaDataAll extends DataIdentifier {
if(_formatTypeString != null &&
EnumUtils.isValidEnum(Types.FileFormat.class, _formatTypeString.toUpperCase()))
setFileFormat(Types.FileFormat.safeValueOf(_formatTypeString));
}
+
+ public DataCharacteristics getDataCharacteristics() {
+ return new MatrixCharacteristics(getDim1(), getDim2(),
getBlocksize(), getNnz());
+ }
@SuppressWarnings("unchecked")
public HashMap<String, Expression> parseMetaDataFileParameters(String
mtdFileName, boolean conditional, HashMap<String, Expression> varParams)
diff --git a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
index f19f1ca..b44b7ce 100644
--- a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
@@ -41,6 +41,8 @@ import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.parser.DataExpression;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
import org.apache.sysds.runtime.io.BinaryBlockSerialization;
import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -308,11 +310,20 @@ public class HDFSTool
default: return line;
}
}
-
+
+ public static ScalarObject readScalarObjectFromHDFSFile(String fname,
ValueType vt) {
+ try {
+ return ScalarObjectFactory.createScalarObject(vt,
readObjectFromHDFSFile(fname, vt));
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
private static BufferedWriter setupOutputFile ( String filename )
throws IOException {
Path path = new Path(filename);
FileSystem fs = IOUtilFunctions.getFileSystem(path);
- BufferedWriter br=new BufferedWriter(new
OutputStreamWriter(fs.create(path,true)));
+ BufferedWriter br=new BufferedWriter(new
OutputStreamWriter(fs.create(path,true)));
return br;
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
b/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
index b1798f7..e2f65cb 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
@@ -107,7 +107,7 @@ public class FullDynWriteTest extends AutomatedTestBase
programArgs = new String[]{ "-explain","-args",
input("A"), fmt.toString(), outputDir()};
- try
+ try
{
long seed1 = System.nanoTime();
double[][] A = getRandomMatrix(rows, cols, 0, 1, 1.0,
seed1);
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/ReadWriteListTest.java
b/src/test/java/org/apache/sysds/test/functions/io/ReadWriteListTest.java
new file mode 100644
index 0000000..ad3415c
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/io/ReadWriteListTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+public class ReadWriteListTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME1 = "ListWrite";
+ private final static String TEST_NAME2 = "ListRead";
+ private final static String TEST_DIR = "functions/io/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
ReadWriteListTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-6;
+
+ private final static int rows = 350;
+ private final static int cols = 110;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"L","R1"}) );
+ addTestConfiguration(TEST_NAME2, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"R2"}) );
+ }
+
+ @Test
+ public void testListBinarySinglenode() {
+ runListReadWriteTest(false, FileFormat.BINARY,
ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testListBinaryHybrid() {
+ runListReadWriteTest(false, FileFormat.BINARY, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testListTextSinglenode() {
+ runListReadWriteTest(false, FileFormat.TEXT,
ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testListTextHybrid() {
+ runListReadWriteTest(false, FileFormat.TEXT, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testNamedListBinarySinglenode() {
+ runListReadWriteTest(true, FileFormat.BINARY,
ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testNamedListBinaryHybrid() {
+ runListReadWriteTest(true, FileFormat.BINARY, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testNamedListTextSinglenode() {
+ runListReadWriteTest(true, FileFormat.TEXT,
ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testNamedListTextHybrid() {
+ runListReadWriteTest(true, FileFormat.TEXT, ExecMode.HYBRID);
+ }
+
+ //TODO support for Spark write/read
+
+ private void runListReadWriteTest(boolean named, FileFormat format,
ExecMode mode)
+ {
+ ExecMode modeOld = setExecMode(mode);
+
+ try {
+ TestConfiguration config =
getTestConfiguration(TEST_NAME1);
+ loadTestConfiguration(config);
+
+ //run write
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[]{"-args",
String.valueOf(rows),
+ String.valueOf(cols), output("R1"),
output("L"), format.toString(), String.valueOf(named)};
+
+ runTest(true, false, null, -1);
+ double val1 =
HDFSTool.readDoubleFromHDFSFile(output("R1"));
+
+ //run read
+ fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
+ programArgs = new String[]{"-args", output("L"),
output("R2")};
+
+ runTest(true, false, null, -1);
+ double val2 =
HDFSTool.readDoubleFromHDFSFile(output("R2"));
+
+ Assert.assertEquals(new Double(val1), new Double(val2),
eps);
+ }
+ catch(IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ finally {
+ resetExecMode(modeOld);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/io/ListRead.dml
b/src/test/scripts/functions/io/ListRead.dml
new file mode 100644
index 0000000..6a5910a
--- /dev/null
+++ b/src/test/scripts/functions/io/ListRead.dml
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+L = read($1);
+W1 = as.matrix(L[1]);
+W2 = as.matrix(L[2]);
+s3 = as.scalar(L[3]);
+W4 = as.matrix(L[4]);
+R1 = sum(W1 * W2 + s3 * W4);
+
+# cut and reexecute to ensure that
+# individual files were not deleted
+while(FALSE){}
+Lb = read($1);
+W1b = as.matrix(Lb[1]);
+W2b = as.matrix(Lb[2]);
+s3b = as.scalar(Lb[3]);
+W4b = as.matrix(Lb[4]);
+
+R2 = sum(W1b * W2b + s3b * W4b);
+R3 = R1/2 + R2/2
+
+write(R3, $2, format="text");
diff --git a/src/test/scripts/functions/io/ListWrite.dml
b/src/test/scripts/functions/io/ListWrite.dml
new file mode 100644
index 0000000..bb55635
--- /dev/null
+++ b/src/test/scripts/functions/io/ListWrite.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+nr = $1
+nc = $2
+
+W1 = rand(rows=nr, cols=nc, seed=1);
+W2 = rand(rows=nr, cols=nc, seed=2);
+s3 = nr * nc;
+W4 = rand(rows=nr, cols=nc, seed=3);
+
+R1 = sum(W1 * W2 + s3 * W4);
+L = list(W1, W2, s3, W4);
+
+write(R1, $3, format="text");
+write(L, $4, format=$5)