This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new cee72fcd62 [SYSTEMDS-3795] Fix multi-threaded HDF5 readers/writers
cee72fcd62 is described below
commit cee72fcd62524cd6afb29208bdefdd54fa5fe5d0
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Nov 28 13:18:01 2024 +0100
[SYSTEMDS-3795] Fix multi-threaded HDF5 readers/writers
This patch fixes the existing multi-threaded HDF5 readers/writers by
adding (1) proper NNZ maintenance of the overall block, and (2) handling
of both single- and multi-part HDF5 files/directories.
---
.../org/apache/sysds/runtime/io/ReaderHDF5.java | 6 ++--
.../sysds/runtime/io/ReaderHDF5Parallel.java | 41 ++++++++++++++--------
.../sysds/runtime/io/WriterHDF5Parallel.java | 9 +----
.../sysds/test/functions/io/SeqParReadTest2.java | 4 +--
4 files changed, 32 insertions(+), 28 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
index ff2e6945d2..f65887a2cb 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
@@ -108,7 +108,7 @@ public class ReaderHDF5 extends MatrixReader {
//determine matrix size via additional pass if required
if(dest == null) {
- dest = computeHDF5Size(files, fs, datasetName);
+ dest = computeHDF5Size(files, fs, datasetName,
rlen*clen);
clen = dest.getNumColumns();
rlen = dest.getNumRows();
}
@@ -169,7 +169,7 @@ public class ReaderHDF5 extends MatrixReader {
return lnnz;
}
- public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem
fs, String datasetName)
+ public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem
fs, String datasetName, long estnnz)
throws IOException, DMLRuntimeException
{
int nrow = 0;
@@ -186,6 +186,6 @@ public class ReaderHDF5 extends MatrixReader {
IOUtilFunctions.closeSilently(bis);
}
// allocate target matrix block based on given size;
- return createOutputMatrixBlock(nrow, ncol, nrow, (long) nrow *
ncol, true, false);
+ return createOutputMatrixBlock(nrow, ncol, nrow, estnnz, true,
true);
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
index 56c211d54c..658eb53826 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
@@ -38,6 +38,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.io.hdf5.H5Constants;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.HDFSTool;
public class ReaderHDF5Parallel extends ReaderHDF5 {
@@ -46,7 +47,7 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
public ReaderHDF5Parallel(FileFormatPropertiesHDF5 props) {
super(props);
- _numThreads = OptimizerUtils.getParallelTextReadParallelism();
+ _numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
}
@Override
@@ -69,26 +70,31 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
// allocate output matrix block
ArrayList<Path> files = new ArrayList<>();
files.add(path);
- MatrixBlock src = computeHDF5Size(files, fs,
_props.getDatasetName());
-
+ MatrixBlock src = computeHDF5Size(files, fs,
_props.getDatasetName(), estnnz);
+ int numParts = Math.min(files.size(), _numThreads);
+
//create and execute tasks
ExecutorService pool = CommonThreadPool.get(_numThreads);
try {
int bufferSize = (src.getNumColumns() *
src.getNumRows()) * 8 + H5Constants.STATIC_HEADER_SIZE;
ArrayList<ReadHDF5Task> tasks = new ArrayList<>();
rlen = src.getNumRows();
- int blklen = (int) Math.ceil((double) rlen /
_numThreads);
+ int blklen = (int) Math.ceil((double) rlen / numParts);
for(int i = 0; i < _numThreads & i * blklen < rlen;
i++) {
int rl = i * blklen;
int ru = (int) Math.min((i + 1) * blklen, rlen);
- BufferedInputStream bis = new
BufferedInputStream(fs.open(path), bufferSize);
+ Path newPath = HDFSTool.isDirectory(fs, path) ?
+ new Path(path,
IOUtilFunctions.getPartFileName(i)) : path;
+ BufferedInputStream bis = new
BufferedInputStream(fs.open(newPath), bufferSize);
//BufferedInputStream bis, String datasetName,
MatrixBlock src, MutableInt rl, int ru
- tasks.add(new ReadHDF5Task(bis,
_props.getDatasetName(), src, rl, ru));
+ tasks.add(new ReadHDF5Task(bis,
_props.getDatasetName(), src, rl, ru, clen, blklen));
}
- for(Future<Object> task : pool.invokeAll(tasks))
- task.get();
+ long nnz = 0;
+ for(Future<Long> task : pool.invokeAll(tasks))
+ nnz += task.get();
+ src.setNonZeros(nnz);
return src;
}
@@ -102,31 +108,36 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
@Override
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen,
long clen, int blen, long estnnz)
- throws IOException, DMLRuntimeException {
-
+ throws IOException, DMLRuntimeException
+ {
return new ReaderHDF5(_props).readMatrixFromInputStream(is,
rlen, clen, blen, estnnz);
}
- private static class ReadHDF5Task implements Callable<Object> {
+ private static class ReadHDF5Task implements Callable<Long> {
private final BufferedInputStream _bis;
private final String _datasetName;
private final MatrixBlock _src;
private final int _rl;
private final int _ru;
+ private final long _clen;
+ private final int _blen;
- public ReadHDF5Task(BufferedInputStream bis, String
datasetName, MatrixBlock src, int rl, int ru) {
+ public ReadHDF5Task(BufferedInputStream bis, String
datasetName, MatrixBlock src,
+ int rl, int ru, long clen, int blen)
+ {
_bis = bis;
_datasetName = datasetName;
_src = src;
_rl = rl;
_ru = ru;
+ _clen = clen;
+ _blen = blen;
}
@Override
- public Object call() throws IOException {
- readMatrixFromHDF5(_bis, _datasetName, _src, _rl, _ru,
0, 0);
- return null;
+ public Long call() throws IOException {
+ return readMatrixFromHDF5(_bis, _datasetName, _src,
_rl, _ru, _clen, _blen);
}
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
index e136c496b3..f8cb47acb2 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.common.Types;
@@ -80,13 +79,6 @@ public class WriterHDF5Parallel extends WriterHDF5 {
for(Future<Object> task : pool.invokeAll(tasks))
task.get();
-
- // delete crc files if written to local file system
- if(fs instanceof LocalFileSystem) {
- for(int i = 0; i < numThreads & i * blklen <
rlen; i++)
- IOUtilFunctions
-
.deleteCrcFilesFromLocalFileSystem(fs, new Path(path,
IOUtilFunctions.getPartFileName(i)));
- }
}
catch(Exception e) {
throw new IOException("Failed parallel write of HDF5
output.", e);
@@ -115,6 +107,7 @@ public class WriterHDF5Parallel extends WriterHDF5 {
@Override
public Object call() throws IOException {
writeHDF5MatrixToFile(_path, _job, _fs, _src, _rl, _ru);
+ IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(_job,
_path);
return null;
}
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index 84dc72be0e..7f2aedfe0f 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -147,8 +147,8 @@ public class SeqParReadTest2 extends AutomatedTestBase {
{false, "binary", true, 0.1},
{true, "hdf5", false, 0.7},
{true, "hdf5", false, 0.1},
- //{true, "hdf5", true, 0.7}, //FIXME
- //{true, "hdf5", true, 0.1},
+ {true, "hdf5", true, 0.7},
+ {true, "hdf5", true, 0.1},
{true, "libsvm", false, 0.7},
{true, "libsvm", false, 0.1},
{true, "libsvm", true, 0.7},