This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 9241488e0c2ec3027d6157243f0a755f3cec7e16 Author: baunsgaard <[email protected]> AuthorDate: Sat Sep 17 23:21:00 2022 +0200 [SYSTEMDS-2699] CLA IO Compressed Matrix This commit adds the basic blocks for writing a compressed matrix to disk, and adds a basic test for the case of writing a matrix and read it back from disk. Further testing and full integration into DML is needed, and a mechanism to detect if the format of the compression groups have changed. --- .gitignore | 1 + src/main/java/org/apache/sysds/common/Types.java | 1 + .../runtime/compress/CompressedMatrixBlock.java | 28 +++++ .../runtime/compress/io/ReaderCompressed.java | 99 ++++++++++++++++ .../runtime/compress/io/WriterCompressed.java | 95 +++++++++++++++ .../sysds/runtime/io/MatrixReaderFactory.java | 5 + .../sysds/runtime/io/MatrixWriterFactory.java | 6 +- .../sysds/test/component/compress/io/IOTest.java | 131 +++++++++++++++++++++ 8 files changed, 365 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f463565a38..89df46df01 100644 --- a/.gitignore +++ b/.gitignore @@ -56,6 +56,7 @@ docs/_site # Test Artifacts src/test/scripts/**/*.dmlt src/test/scripts/functions/mlcontextin/ +src/test/java/org/apache/sysds/test/component/compress/io/files .factorypath # Excluded sources diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java index a9b8108600..1edc511cc3 100644 --- a/src/main/java/org/apache/sysds/common/Types.java +++ b/src/main/java/org/apache/sysds/common/Types.java @@ -539,6 +539,7 @@ public class Types TEXT, // text cell IJV representation (mm w/o header) MM, // text matrix market IJV representation CSV, // text dense representation + COMPRESSED, // Internal SYSTEMDS compressed format LIBSVM, // text libsvm sparse row representation JSONL, // text nested JSON (Line) representation BINARY, // binary block representation (dense/sparse/ultra-sparse) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 57ab75cae4..6d795963d4 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -168,6 +168,25 @@ public class CompressedMatrixBlock extends MatrixBlock { decompressedVersion = new SoftReference<>(uncompressedMatrixBlock); } + /** + * Direct constructor with everything. + * + * @param rl Number of rows in the block + * @param cl Number of columns + * @param nnz Number of non zeros + * @param overlapping If the matrix is overlapping + * @param groups The list of column groups + */ + protected CompressedMatrixBlock(int rl, int cl, long nnz, boolean overlapping, List<AColGroup> groups) { + super(true); + this.rlen = rl; + this.clen = cl; + this.sparse = false; + this.nonZeros = nnz; + this.overlappingColGroups = overlapping; + this._colGroups = groups; + } + @Override public void reset(int rl, int cl, boolean sp, long estnnz, double val) { throw new DMLCompressionException("Invalid to reset a Compressed MatrixBlock"); @@ -370,6 +389,15 @@ public class CompressedMatrixBlock extends MatrixBlock { _colGroups = ColGroupIO.readGroups(in, rlen); } + public static CompressedMatrixBlock read(DataInput in) throws IOException { + int rlen = in.readInt(); + int clen = in.readInt(); + long nonZeros = in.readLong(); + boolean overlappingColGroups = in.readBoolean(); + List<AColGroup> groups = ColGroupIO.readGroups(in, rlen); + return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups); + } + @Override public void write(DataOutput out) throws IOException { if(nonZeros > 0 && getExactSizeOnDisk() > MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros)) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java new file mode 100644 index 0000000000..24db387718 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.io; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.io.IOUtilFunctions; +import org.apache.sysds.runtime.io.MatrixReader; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class ReaderCompressed extends MatrixReader { + + public static ReaderCompressed create() { + return new ReaderCompressed(); + } + + public static MatrixBlock readCompressedMatrixFromHDFS(String fname) throws IOException { + return create().readMatrixFromHDFS(fname, 0, 0, 0, 0); + } + + @Override + public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz) + throws IOException, DMLRuntimeException { + + JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + + checkValidInputFile(fs, path); + + MatrixBlock cmb = readCompressedMatrix(path, job, fs); + + if(cmb.getNumRows() != rlen) + LOG.warn("Metadata file does not correlate with compressed file, NRows : " + cmb.getNumRows() + " vs " + rlen); + if(cmb.getNumColumns() != clen) + LOG.warn( + "Metadata file does not correlate with compressed file, NCols : " + cmb.getNumColumns() + " vs " + clen); + + return cmb; + } + + @Override + public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz) + throws IOException, DMLRuntimeException { + throw new NotImplementedException("Not implemented reading compressedMatrix from input stream"); + } + + private static MatrixBlock readCompressedMatrix(Path path, JobConf job, FileSystem fs) throws IOException { + if(fs.getFileStatus(path).isDirectory()) + return readCompressedMatrixFolder(path, job, fs); + else + return readCompressedMatrixSingleFile(path, job, fs); + } + + private static MatrixBlock readCompressedMatrixFolder(Path path, JobConf job, FileSystem fs) { + throw new NotImplementedException(); + } + + private static MatrixBlock readCompressedMatrixSingleFile(Path path, JobConf job, FileSystem fs) throws IOException { + final InputStream is = fs.open(path); + final DataInput in = new DataInputStream(is); + MatrixBlock ret; + try { + ret = CompressedMatrixBlock.read(in); + } + finally { + is.close(); + } + return ret; + } + +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java new file mode 100644 index 0000000000..458ee5df31 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.io; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.io.FileFormatProperties; +import org.apache.sysds.runtime.io.IOUtilFunctions; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.HDFSTool; + +public class WriterCompressed extends MatrixWriter { + + public static WriterCompressed create(FileFormatProperties props) { + return new WriterCompressed(); + } + + public static void writeCompressedMatrixToHDFS(MatrixBlock src, String fname) throws IOException { + create(null).writeMatrixToHDFS(src, fname, 0, 0, 0, 0, false); + } + + @Override + public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag) + throws IOException { + JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + + HDFSTool.deleteFileIfExistOnHDFS(fname); + try { + writeCompressedMatrixToHDFS(path, job, fs, src); + } + catch(DMLCompressionException ce) { + fs.delete(path, true); + throw ce; + } + finally { + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); + } + } + + @Override + public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int blen) throws IOException { + throw new NotImplementedException(); + } + + private void writeCompressedMatrixToHDFS(Path path, JobConf conf, FileSystem fs, MatrixBlock src) + throws IOException { + final OutputStream os = fs.create(path, true); + final DataOutput out = new DataOutputStream(os); + try { + final MatrixBlock mb = src instanceof CompressedMatrixBlock ? // If compressed + src : // Do not compress + CompressedMatrixBlockFactory.compress(src).getLeft(); // otherwise compress + + if(!(mb instanceof CompressedMatrixBlock)) + throw new DMLCompressionException("Input was not compressed, therefore the file was not saved to disk"); + + CompressedMatrixBlock cmb = (CompressedMatrixBlock) mb; + cmb.write(out); + } + finally { + os.close(); + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java index 473f6441a2..bcf5ac6e48 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java @@ -25,6 +25,7 @@ import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.conf.CompilerConfig.ConfigType; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.io.ReaderCompressed; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -66,6 +67,10 @@ public class MatrixReaderFactory { reader = (par & mcsr) ? new ReaderHDF5Parallel( new FileFormatPropertiesHDF5()) : new ReaderHDF5(new FileFormatPropertiesHDF5()); break; + + case COMPRESSED: + reader = ReaderCompressed.create(); + default: throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java index 10fdb2d8f1..82af28c0bf 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java @@ -19,10 +19,11 @@ package org.apache.sysds.runtime.io; -import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.conf.CompilerConfig.ConfigType; +import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.io.WriterCompressed; public class MatrixWriterFactory { @@ -85,6 +86,9 @@ public class MatrixWriterFactory else return new WriterHDF5((FileFormatPropertiesHDF5) props); + case COMPRESSED: + return WriterCompressed.create(props); + default: throw new DMLRuntimeException("Failed to create matrix writer for unknown format: " + fmt.toString()); } diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java new file mode 100644 index 0000000000..104794f4ff --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.io.ReaderCompressed; +import org.apache.sysds.runtime.compress.io.WriterCompressed; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.TestUtils; +import org.junit.AfterClass; +import org.junit.Test; + +public class IOTest { + + protected static final Log LOG = LogFactory.getLog(IOTest.class.getName()); + + final static Object lock = new Object(); + + final static String nameBeginning = "src/test/java/org/apache/sysds/test/component/compress/io/files/"; + + static AtomicInteger id = new AtomicInteger(0); + + public IOTest() { + synchronized(lock) { + new File(nameBeginning).mkdirs(); + } + } + + private static void deleteDirectory(File file) { + for(File subfile : file.listFiles()) { + if(subfile.isDirectory()) + deleteDirectory(subfile); + subfile.delete(); + } + file.delete(); + } + + @AfterClass + public static void cleanup() { + deleteDirectory(new File(nameBeginning)); + } + + public static String getName() { + return nameBeginning + "testWrite" + id.incrementAndGet() + ".cla"; + } + + @Test + public void testWrite() { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514)); + write(mb, getName()); + } + + @Test + public void testWriteAlreadyCompressed() { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514)); + MatrixBlock mb2 = CompressedMatrixBlockFactory.compress(mb).getLeft(); + write(mb2, getName()); + } + + @Test + public void testWriteAndRead() { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514)); + + String filename = getName(); + write(mb, filename); + MatrixBlock mbr = read(filename); + + assertEquals(mb.sum(), mbr.sum(), 0.0001); + assertEquals(mb.min(), mbr.min(), 0.0001); + assertEquals(mb.max(), mbr.max(), 0.0001); + assertEquals(mb.getNumRows(), mbr.getNumRows()); + assertEquals(mb.getNumColumns(), mbr.getNumColumns()); + assertTrue(mb.getInMemorySize() > mbr.getInMemorySize()); + assertTrue(mb.getExactSizeOnDisk() > mbr.getExactSizeOnDisk()); + + } + + @Test(expected = DMLCompressionException.class) + public void testWriteNotCompressable() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(3, 3, 1, 3, 1.0, 2514)); + WriterCompressed.writeCompressedMatrixToHDFS(mb, getName()); + } + + private static MatrixBlock read(String path) { + try { + return ReaderCompressed.readCompressedMatrixFromHDFS(path); + } + catch(Exception e) { + e.printStackTrace(); + fail("Failed to read file"); + return null; + } + } + + private static void write(MatrixBlock src, String path) { + try { + WriterCompressed.writeCompressedMatrixToHDFS(src, path); + } + catch(Exception e) { + e.printStackTrace(); + fail("Failed to write file"); + } + } +}
