This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 6f3cdb389d [SYSTEMDS-3932] CSV reader for out-of-core streams
6f3cdb389d is described below
commit 6f3cdb389d15ae577238a9ee0c1d3209453d9c98
Author: Jannik Lindemann <[email protected]>
AuthorDate: Sun Nov 16 08:54:54 2025 +0100
[SYSTEMDS-3932] CSV reader for out-of-core streams
Closes #2352.
---
.../java/org/apache/sysds/lops/CSVReBlock.java | 4 +-
.../runtime/instructions/OOCInstructionParser.java | 3 +
.../instructions/ooc/CSVReblockOOCInstruction.java | 88 ++++++
.../instructions/ooc/OOCEvictionManager.java | 9 +-
.../sysds/runtime/io/ReaderTextCSVParallel.java | 349 ++++++++++++++++++---
.../sysds/test/functions/ooc/CSVReaderTest.java | 140 +++++++++
src/test/scripts/functions/ooc/CSVReader.dml | 23 ++
7 files changed, 558 insertions(+), 58 deletions(-)
diff --git a/src/main/java/org/apache/sysds/lops/CSVReBlock.java
b/src/main/java/org/apache/sysds/lops/CSVReBlock.java
index b554b78771..5820a732ee 100644
--- a/src/main/java/org/apache/sysds/lops/CSVReBlock.java
+++ b/src/main/java/org/apache/sysds/lops/CSVReBlock.java
@@ -44,8 +44,8 @@ public class CSVReBlock extends Lop
_blocksize = blen;
- if(et == ExecType.SPARK) {
- lps.setProperties( inputs, ExecType.SPARK);
+ if(et == ExecType.SPARK || et == ExecType.OOC) {
+ lps.setProperties( inputs, et );
}
else {
throw new LopsException("Incorrect execution type for
CSVReblock:" + et);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
index 4e9a92ecb7..03c806b09a 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java
@@ -25,6 +25,7 @@ import org.apache.sysds.common.InstructionType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.instructions.ooc.AggregateUnaryOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction;
+import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction;
import org.apache.sysds.runtime.instructions.ooc.OOCInstruction;
@@ -56,6 +57,8 @@ public class OOCInstructionParser extends InstructionParser {
switch(ooctype) {
case Reblock:
return
ReblockOOCInstruction.parseInstruction(str);
+ case CSVReblock:
+ return
CSVReblockOOCInstruction.parseInstruction(str);
case AggregateUnary:
return
AggregateUnaryOOCInstruction.parseInstruction(str);
case Unary:
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java
new file mode 100644
index 0000000000..a4f8c49705
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.ooc;
+
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
+import org.apache.sysds.runtime.io.ReaderTextCSVParallel;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+
+public class CSVReblockOOCInstruction extends ComputationOOCInstruction {
+ private final int blen;
+
+ private CSVReblockOOCInstruction(Operator op, CPOperand in, CPOperand
out, int blocklength, String opcode,
+ String instr) {
+ super(OOCType.Reblock, op, in, out, opcode, instr);
+ blen = blocklength;
+ }
+
+ public static CSVReblockOOCInstruction parseInstruction(String str) {
+ String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
+ String opcode = parts[0];
+ if(!opcode.equals(Opcodes.CSVRBLK.toString()))
+ throw new DMLRuntimeException("Incorrect opcode for
CSVReblockOOCInstruction:" + opcode);
+
+ CPOperand in = new CPOperand(parts[1]);
+ CPOperand out = new CPOperand(parts[2]);
+ int blen = Integer.parseInt(parts[3]);
+ return new CSVReblockOOCInstruction(null, in, out, blen,
opcode, str);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec) {
+ MatrixObject min = ec.getMatrixObject(input1);
+ DataCharacteristics mc =
ec.getDataCharacteristics(input1.getName());
+ DataCharacteristics mcOut =
ec.getDataCharacteristics(output.getName());
+ mcOut.set(mc.getRows(), mc.getCols(), blen, mc.getNonZeros());
+
+ OOCStream<IndexedMatrixValue> qOut = createWritableStream();
+ addOutStream(qOut);
+
+ FileFormatProperties props = min.getFileFormatProperties();
+ final FileFormatPropertiesCSV csvProps = props instanceof
FileFormatPropertiesCSV ? (FileFormatPropertiesCSV) props
+ : new FileFormatPropertiesCSV();
+
+ final ReaderTextCSVParallel reader = new
ReaderTextCSVParallel(csvProps);
+ final String fileName = min.getFileName();
+ final long rows = mc.getRows();
+ final long cols = mc.getCols();
+ final long nnz = mc.getNonZeros();
+
+ submitOOCTask(() -> {
+ try {
+ reader.readMatrixAsStream(qOut, fileName, rows,
cols, blen, nnz);
+ }
+ catch(Exception ex) {
+ throw (ex instanceof DMLRuntimeException) ?
(DMLRuntimeException) ex : new DMLRuntimeException(ex);
+ }
+ }, qOut);
+
+ MatrixObject mout = ec.getMatrixObject(output);
+ mout.setStreamHandle(qOut);
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
index 1d47ac10dc..f5ae7573b0 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
@@ -135,12 +135,12 @@ public class OOCEvictionManager {
private static class partitionFile {
final String filePath;
- final long streamId;
+ //final long streamId;
private partitionFile(String filePath, long streamId) {
this.filePath = filePath;
- this.streamId = streamId;
+ //this.streamId = streamId;
}
}
@@ -152,13 +152,13 @@ public class OOCEvictionManager {
private BlockState state = BlockState.HOT;
private IndexedMatrixValue value;
private final long streamId;
- private final int blockId;
+ //private final int blockId;
private final long size;
BlockEntry(IndexedMatrixValue value, long streamId, int
blockId, long size) {
this.value = value;
this.streamId = streamId;
- this.blockId = blockId;
+ //this.blockId = blockId;
this.size = size;
}
}
@@ -437,6 +437,7 @@ public class OOCEvictionManager {
return mb.getExactSerializedSize();
}
+ @SuppressWarnings("unused")
private static Map.Entry<String, BlockEntry> removeFirstFromCache() {
synchronized (_cacheLock) {
diff --git
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
index 5b297a5d53..3e9a7a881a 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
@@ -22,9 +22,12 @@ package org.apache.sysds.runtime.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -43,8 +46,11 @@ import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseRow;
+import org.apache.sysds.runtime.instructions.ooc.OOCStream;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.io.IOUtilFunctions.CountRowsTask;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.UtilFunctions;
@@ -65,6 +71,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
protected int _rLen;
protected int _cLen;
protected JobConf _job;
+ protected boolean _streamSparse = false;
public ReaderTextCSVParallel(FileFormatPropertiesCSV props) {
_numThreads = OptimizerUtils.getParallelTextReadParallelism();
@@ -97,7 +104,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
MatrixBlock ret =
computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen,
estnnz);
// Second Read Pass (read, parse strings, append to matrix
block)
- readCSVMatrixFromHDFS(splits, path, ret);
+ readCSVMatrixFromHDFS(splits, path, ret, null);
// post-processing (representation-specific, change of
sparse/dense block representation)
// - no sorting required for CSV because it is read in sorted
order per row
@@ -112,6 +119,53 @@ public class ReaderTextCSVParallel extends MatrixReader {
return ret;
}
+ public MatrixBlock readMatrixAsStream(OOCStream<IndexedMatrixValue>
outStream, String fname, long rlen, long clen,
+ int blen, long estnnz) throws IOException, DMLRuntimeException {
+ _bLen = blen;
+
+ // prepare file access
+ _job = new JobConf(ConfigurationManager.getCachedJobConf());
+
+ Path path = new Path(fname);
+ FileSystem fs = IOUtilFunctions.getFileSystem(path, _job);
+
+ FileInputFormat.addInputPath(_job, path);
+ TextInputFormat informat = new TextInputFormat();
+ informat.configure(_job);
+
+ InputSplit[] splits = informat.getSplits(_job, _numThreads);
+ splits = IOUtilFunctions.sortInputSplits(splits);
+
+ // check existence and non-empty file
+ checkValidInputFile(fs, path);
+
+ // count rows/cols to populate meta data and split offsets
+ long estnnz2;
+ ExecutorService pool = CommonThreadPool.get(_numThreads);
+ try {
+ estnnz2 = computeCSVSize(splits, path, rlen, clen,
estnnz, pool);
+ }
+ catch(Exception e) {
+ throw new IOException("Thread pool Error " +
e.getMessage(), e);
+ }
+ finally {
+ pool.shutdown();
+ }
+
+ _streamSparse = MatrixBlock.evalSparseFormatInMemory(_rLen,
_cLen, estnnz2);
+
+ // stream CSV into blen x blen blocks
+ try {
+ BlockBuffer buffer = new BlockBuffer(outStream,
_streamSparse);
+ readCSVMatrixFromHDFS(splits, path, null, buffer);
+ buffer.flushRemaining();
+ }
+ finally {
+ outStream.closeInput();
+ }
+ return null;
+ }
+
@Override
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen,
long clen, int blen, long estnnz)
throws IOException, DMLRuntimeException {
@@ -119,7 +173,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
return new ReaderTextCSV(_props).readMatrixFromInputStream(is,
rlen, clen, blen, estnnz);
}
- private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path,
MatrixBlock dest) throws IOException {
+ private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path,
MatrixBlock dest, BlockBuffer streamBuffer)
+ throws IOException {
FileInputFormat.addInputPath(_job, path);
TextInputFormat informat = new TextInputFormat();
@@ -131,17 +186,19 @@ public class ReaderTextCSVParallel extends MatrixReader {
// create read tasks for all splits
ArrayList<Callable<Long>> tasks = new ArrayList<>();
int splitCount = 0;
+ final boolean sparseOut = (streamBuffer != null) ?
streamBuffer.isSparseBlocks() :
+ dest.isInSparseFormat();
for(InputSplit split : splits) {
- if(dest.isInSparseFormat() &&
_props.getNAStrings() != null)
- tasks.add(new
CSVReadSparseNanTask(split, informat, dest, splitCount++));
- else if(dest.isInSparseFormat() &&
_props.getFillValue() == 0)
- tasks.add(new
CSVReadSparseNoNanTaskAndFill(split, informat, dest, splitCount++));
- else if(dest.isInSparseFormat())
- tasks.add(new
CSVReadSparseNoNanTask(split, informat, dest, splitCount++));
+ if(sparseOut && _props.getNAStrings() != null)
+ tasks.add(new
CSVReadSparseNanTask(split, informat, dest, splitCount++, streamBuffer));
+ else if(sparseOut && _props.getFillValue() == 0)
+ tasks.add(new
CSVReadSparseNoNanTaskAndFill(split, informat, dest, splitCount++,
streamBuffer));
+ else if(sparseOut)
+ tasks.add(new
CSVReadSparseNoNanTask(split, informat, dest, splitCount++, streamBuffer));
else if(_props.getNAStrings() != null)
- tasks.add(new
CSVReadDenseNanTask(split, informat, dest, splitCount++));
+ tasks.add(new
CSVReadDenseNanTask(split, informat, dest, splitCount++, streamBuffer));
else
- tasks.add(new
CSVReadDenseNoNanTask(split, informat, dest, splitCount++));
+ tasks.add(new
CSVReadDenseNoNanTask(split, informat, dest, splitCount++, streamBuffer));
}
// check return codes and aggregate nnz
@@ -149,7 +206,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
for(Future<Long> rt : pool.invokeAll(tasks))
lnnz += rt.get();
- dest.setNonZeros(lnnz);
+ if(dest != null)
+ dest.setNonZeros(lnnz);
}
catch(Exception e) {
throw new IOException("Thread pool issue, while
parallel read.", e);
@@ -159,6 +217,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
}
+
private MatrixBlock
computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits,
Path path, long rlen, long clen, int blen, long estnnz) throws
IOException, DMLRuntimeException {
_rLen = 0;
@@ -172,10 +231,29 @@ public class ReaderTextCSVParallel extends MatrixReader {
Future<MatrixBlock> ret = (rlen<0 || clen<0 ||
estnnz<0) ? null :
pool.submit(() -> createOutputMatrixBlock(rlen,
clen, blen, estnnz, true, true));
+ long estnnz2 = computeCSVSize(splits, path, rlen, clen,
estnnz, pool);
+ return (ret!=null) ? UtilFunctions.getSafe(ret) :
+ createOutputMatrixBlock(_rLen, _cLen, blen,
estnnz2, true, true);
+ }
+ catch(Exception e) {
+ throw new IOException("Thread pool Error " +
e.getMessage(), e);
+ }
+ finally{
+ pool.shutdown();
+ }
+ }
+
+ private long computeCSVSize(InputSplit[] splits,
+ Path path, long rlen, long clen, long estnnz, ExecutorService
pool) throws IOException {
+ _rLen = 0;
+ _cLen = 0;
+
+ // count rows in parallel per split
+ try {
FileInputFormat.addInputPath(_job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(_job);
-
+
// count number of entities in the first non-header row
LongWritable key = new LongWritable();
Text oneLine = new Text();
@@ -196,7 +274,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
tasks.add(new CountRowsTask(split, informat,
_job, hasHeader));
hasHeader = false;
}
-
+
// collect row counts for offset computation
// early error notify in case not all tasks successful
_offsets = new SplitOffsetInfos(tasks.size());
@@ -208,7 +286,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
_rLen = _rLen + lnrow;
i++;
}
-
+
// robustness for wrong dimensions which are already
compiled into the plan
if((rlen != -1 && _rLen != rlen) || (clen != -1 &&
_cLen != clen)) {
@@ -229,8 +307,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free
insert into target
long estnnz2 = (estnnz < 0) ? (long) _rLen * _cLen :
estnnz;
- return (ret!=null) ? UtilFunctions.getSafe(ret) :
- createOutputMatrixBlock(_rLen, _cLen, blen,
estnnz2, true, true);
+ return estnnz2;
}
catch(Exception e) {
throw new IOException("Thread pool Error " +
e.getMessage(), e);
@@ -271,16 +348,19 @@ public class ReaderTextCSVParallel extends MatrixReader {
protected final InputSplit _split;
protected final TextInputFormat _informat;
protected final MatrixBlock _dest;
+ protected final BlockBuffer _streamBuffer;
protected final boolean _isFirstSplit;
protected final int _splitCount;
protected int _row = 0;
protected int _col = 0;
- public CSVReadTask(InputSplit split, TextInputFormat informat,
MatrixBlock dest, int splitCount) {
+ public CSVReadTask(InputSplit split, TextInputFormat informat,
MatrixBlock dest, int splitCount,
+ BlockBuffer buffer) {
_split = split;
_informat = informat;
_dest = dest;
+ _streamBuffer = buffer;
_isFirstSplit = (splitCount == 0);
_splitCount = splitCount;
}
@@ -335,24 +415,184 @@ public class ReaderTextCSVParallel extends MatrixReader {
+ value);
}
}
+
+ protected void finishRow(int row) {
+ if(_streamBuffer != null)
+ _streamBuffer.finishRow(row);
+ }
+ }
+
+ private interface RowWriter {
+ void set(int col, double value);
+ }
+
+ private static class DenseRowWriter implements RowWriter {
+ private final double[] _vals;
+ private final int _pos;
+
+ public DenseRowWriter(DenseBlock block, int row) {
+ _vals = block.values(row);
+ _pos = block.pos(row);
+ }
+
+ @Override
+ public void set(int col, double value) {
+ _vals[_pos + col] = value;
+ }
+ }
+
+ private static class SparseRowWriter implements RowWriter {
+ private final SparseRow _row;
+
+ public SparseRowWriter(SparseBlock block, int row) {
+ block.allocate(row);
+ _row = block.get(row);
+ }
+
+ @Override
+ public void set(int col, double value) {
+ _row.append(col, value);
+ }
+ }
+
+ private class BlockBuffer {
+ private final OOCStream<IndexedMatrixValue> _stream;
+ private final boolean _sparseBlocks;
+ private final int _numBlockCols;
+ private final ConcurrentHashMap<Integer, BlockRowState> _states
= new ConcurrentHashMap<>();
+
+ public BlockBuffer(OOCStream<IndexedMatrixValue> stream,
boolean sparseBlocks) {
+ _stream = stream;
+ _sparseBlocks = sparseBlocks;
+ _numBlockCols = Math.max(1, (int) Math.ceil((double)
_cLen / _bLen));
+ }
+
+ public boolean isSparseBlocks() {
+ return _sparseBlocks;
+ }
+
+ public RowWriter getRowWriter(int row) {
+ int brow = row / _bLen;
+ BlockRowState state = _states.computeIfAbsent(brow,
BlockRowState::new);
+ return state.createRowWriter(row % _bLen);
+ }
+
+ public void finishRow(int row) {
+ int brow = row / _bLen;
+ BlockRowState state = _states.get(brow);
+ if(state != null && state.finishRow()) {
+ if(_states.remove(brow, state))
+ state.flush(brow);
+ }
+ }
+
+ public void flushRemaining() {
+ for(Map.Entry<Integer, BlockRowState> entry :
_states.entrySet()) {
+ if(_states.remove(entry.getKey(),
entry.getValue()))
+ entry.getValue().flush(entry.getKey());
+ }
+ }
+
+ private class StreamRowWriter implements RowWriter {
+ private final BlockRowState _state;
+ private final int _rowInBlock;
+
+ public StreamRowWriter(BlockRowState state, int
rowInBlock) {
+ _state = state;
+ _rowInBlock = rowInBlock;
+ }
+
+ @Override
+ public void set(int col, double value) {
+ if(value == 0)
+ return;
+ int bcol = col / _bLen;
+ MatrixBlock block =
_state.getOrCreateBlock(bcol);
+ int localCol = col % _bLen;
+ if(_sparseBlocks) {
+ SparseBlock sb = block.getSparseBlock();
+ sb.allocate(_rowInBlock);
+ sb.get(_rowInBlock).append(localCol,
value);
+ }
+ else {
+ DenseBlock db = block.getDenseBlock();
+ double[] vals = db.values(_rowInBlock);
+ int pos = db.pos(_rowInBlock);
+ vals[pos + localCol] = value;
+ }
+ }
+ }
+
+ private class BlockRowState {
+ private final MatrixBlock[] _blocks;
+ private final int _rowsInBlock;
+ private final AtomicInteger _rowsCompleted = new
AtomicInteger();
+
+ public BlockRowState(int brow) {
+ _blocks = new MatrixBlock[_numBlockCols];
+ _rowsInBlock = Math.min(_bLen, _rLen - brow *
_bLen);
+ }
+
+ public RowWriter createRowWriter(int rowInBlock) {
+ return new StreamRowWriter(this, rowInBlock);
+ }
+
+ public boolean finishRow() {
+ return _rowsCompleted.incrementAndGet() ==
_rowsInBlock;
+ }
+
+ public void flush(int brow) {
+ for(int bci = 0; bci < _blocks.length; bci++) {
+ MatrixBlock block = _blocks[bci];
+ if(block == null)
+ continue;
+ block.recomputeNonZeros();
+ if(block.getNonZeros() == 0)
+ continue;
+ block.examSparsity();
+ MatrixIndexes idx = new
MatrixIndexes(brow + 1, bci + 1);
+ _stream.enqueue(new
IndexedMatrixValue(idx, block));
+ }
+ }
+
+ private MatrixBlock getOrCreateBlock(int bcol) {
+ MatrixBlock block = _blocks[bcol];
+ if(block == null) {
+ synchronized(this) {
+ block = _blocks[bcol];
+ if(block == null) {
+ int cols =
Math.min(_bLen, _cLen - bcol * _bLen);
+ block = new
MatrixBlock(_rowsInBlock, cols, _sparseBlocks);
+ if(_sparseBlocks)
+
block.allocateSparseRowsBlock();
+ else
+
block.allocateDenseBlock();
+ _blocks[bcol] = block;
+ }
+ }
+ }
+ return block;
+ }
+ }
}
private class CSVReadDenseNoNanTask extends CSVReadTask {
- public CSVReadDenseNoNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount) {
- super(split, informat, dest, splitCount);
+ public CSVReadDenseNoNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount,
+ BlockBuffer buffer) {
+ super(split, informat, dest, splitCount, buffer);
}
protected long parse(RecordReader<LongWritable, Text> reader,
LongWritable key, Text value) throws IOException {
- DenseBlock a = _dest.getDenseBlock();
+ DenseBlock a = (_streamBuffer == null) ?
_dest.getDenseBlock() : null;
double cellValue = 0;
long nnz = 0;
boolean noFillEmpty = false;
while(reader.next(key, value)) { // foreach line
final String cellStr = value.toString().trim();
- double[] avals = a.values(_row);
- int apos = a.pos(_row);
+ RowWriter rowWriter = (_streamBuffer != null) ?
+ _streamBuffer.getRowWriter(_row) : new
DenseRowWriter(a, _row);
final String[] parts = _cLen == 1 ? null :
IOUtilFunctions.split(cellStr,
_props.getDelim());
@@ -365,14 +605,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
else {
cellValue =
Double.parseDouble(part);
}
- if(cellValue != 0) {
- avals[apos+j] = cellValue;
+ rowWriter.set(j, cellValue);
+ if(cellValue != 0)
nnz++;
- }
}
// sanity checks (number of columns, fill
values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
+ finishRow(_row);
_row++;
}
@@ -383,20 +623,21 @@ public class ReaderTextCSVParallel extends MatrixReader {
private class CSVReadDenseNanTask extends CSVReadTask {
- public CSVReadDenseNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount) {
- super(split, informat, dest, splitCount);
+ public CSVReadDenseNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount,
+ BlockBuffer buffer) {
+ super(split, informat, dest, splitCount, buffer);
}
protected long parse(RecordReader<LongWritable, Text> reader,
LongWritable key, Text value) throws IOException {
- DenseBlock a = _dest.getDenseBlock();
+ DenseBlock a = (_streamBuffer == null) ?
_dest.getDenseBlock() : null;
double cellValue = 0;
boolean noFillEmpty = false;
long nnz = 0;
while(reader.next(key, value)) { // foreach line
String cellStr = value.toString().trim();
String[] parts = IOUtilFunctions.split(cellStr,
_props.getDelim());
- double[] avals = a.values(_row);
- int apos = a.pos(_row);
+ RowWriter rowWriter = (_streamBuffer != null) ?
+ _streamBuffer.getRowWriter(_row) : new
DenseRowWriter(a, _row);
for(int j = 0; j < _cLen; j++) { // foreach cell
String part = parts[j].trim();
if(part.isEmpty()) {
@@ -406,14 +647,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
else
cellValue =
UtilFunctions.parseToDouble(part, _props.getNAStrings());
- if(cellValue != 0) {
- avals[apos+j] = cellValue;
+ rowWriter.set(j, cellValue);
+ if(cellValue != 0)
nnz++;
- }
}
// sanity checks (number of columns, fill
values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
+ finishRow(_row);
_row++;
}
return nnz;
@@ -422,23 +663,23 @@ public class ReaderTextCSVParallel extends MatrixReader {
private class CSVReadSparseNanTask extends CSVReadTask {
- public CSVReadSparseNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount) {
- super(split, informat, dest, splitCount);
+ public CSVReadSparseNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount,
+ BlockBuffer buffer) {
+ super(split, informat, dest, splitCount, buffer);
}
protected long parse(RecordReader<LongWritable, Text> reader,
LongWritable key, Text value) throws IOException {
boolean noFillEmpty = false;
double cellValue = 0;
- final SparseBlock sb = _dest.getSparseBlock();
+ final SparseBlock sb = (_streamBuffer == null) ?
_dest.getSparseBlock() : null;
long nnz = 0;
while(reader.next(key, value)) {
final String cellStr = value.toString().trim();
final String[] parts =
IOUtilFunctions.split(cellStr, _props.getDelim());
_col = 0;
- sb.allocate(_row);
- SparseRow r = sb.get(_row);
-
+ RowWriter rowWriter = (_streamBuffer != null) ?
+ _streamBuffer.getRowWriter(_row) : new
SparseRowWriter(sb, _row);
for(String part : parts) {
part = part.trim();
if(part.isEmpty()) {
@@ -450,7 +691,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
if(cellValue != 0) {
- r.append(_col, cellValue);
+ rowWriter.set(_col, cellValue);
nnz++;
}
_col++;
@@ -460,6 +701,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
+ finishRow(_row);
_row++;
}
return nnz;
@@ -467,12 +709,13 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
private class CSVReadSparseNoNanTask extends CSVReadTask {
- public CSVReadSparseNoNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount) {
- super(split, informat, dest, splitCount);
+ public CSVReadSparseNoNanTask(InputSplit split, TextInputFormat
informat, MatrixBlock dest, int splitCount,
+ BlockBuffer buffer) {
+ super(split, informat, dest, splitCount, buffer);
}
protected long parse(RecordReader<LongWritable, Text> reader,
LongWritable key, Text value) throws IOException {
- final SparseBlock sb = _dest.getSparseBlock();
+ final SparseBlock sb = (_streamBuffer == null) ?
_dest.getSparseBlock() : null;
long nnz = 0;
double cellValue = 0;
boolean noFillEmpty = false;
@@ -480,8 +723,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
_col = 0;
final String cellStr = value.toString().trim();
final String[] parts =
IOUtilFunctions.split(cellStr, _props.getDelim());
- sb.allocate(_row);
- SparseRow r = sb.get(_row);
+ RowWriter rowWriter = (_streamBuffer != null) ?
+ _streamBuffer.getRowWriter(_row) : new
SparseRowWriter(sb, _row);
for(String part : parts) {
part = part.trim();
if(part.isEmpty()) {
@@ -493,7 +736,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
if(cellValue != 0) {
- r.append(_col, cellValue);
+ rowWriter.set(_col, cellValue);
nnz++;
}
_col++;
@@ -503,6 +746,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
+ finishRow(_row);
_row++;
}
return nnz;
@@ -511,25 +755,25 @@ public class ReaderTextCSVParallel extends MatrixReader {
private class CSVReadSparseNoNanTaskAndFill extends CSVReadTask {
public CSVReadSparseNoNanTaskAndFill(InputSplit split,
TextInputFormat informat, MatrixBlock dest,
- int splitCount) {
- super(split, informat, dest, splitCount);
+ int splitCount, BlockBuffer buffer) {
+ super(split, informat, dest, splitCount, buffer);
}
protected long parse(RecordReader<LongWritable, Text> reader,
LongWritable key, Text value) throws IOException {
- final SparseBlock sb = _dest.getSparseBlock();
+ final SparseBlock sb = (_streamBuffer == null) ?
_dest.getSparseBlock() : null;
long nnz = 0;
double cellValue = 0;
while(reader.next(key, value)) {
_col = 0;
final String cellStr = value.toString().trim();
final String[] parts =
IOUtilFunctions.split(cellStr, _props.getDelim());
- sb.allocate(_row);
- SparseRow r = sb.get(_row);
+ RowWriter rowWriter = (_streamBuffer != null) ?
+ _streamBuffer.getRowWriter(_row) : new
SparseRowWriter(sb, _row);
for(String part : parts) {
if(!part.isEmpty()) {
cellValue =
Double.parseDouble(part);
if(cellValue != 0) {
- r.append(_col,
cellValue);
+ rowWriter.set(_col,
cellValue);
nnz++;
}
}
@@ -538,6 +782,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
+ finishRow(_row);
_row++;
}
return nnz;
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java
new file mode 100644
index 0000000000..5f5f7fb42f
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.ooc;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class CSVReaderTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "CSVReader";
+ private final static String TEST_DIR = "functions/ooc/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
CSVReaderTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-8;
+ private static final String INPUT_NAME = "X";
+ private static final String OUTPUT_NAME = "res";
+
+ private final static int maxVal = 7;
+ private final static double sparsity1 = 0.65;
+ private final static double sparsity2 = 0.05;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ TestConfiguration config = new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
+ addTestConfiguration(TEST_NAME1, config);
+ }
+
+ @Test
+ public void testCSVReaderDense1() {
+ runCSVReaderTest(false, 1800, 1100);
+ }
+
+ @Test
+ public void testCSVReaderSparse1() {
+ runCSVReaderTest(true, 1800, 1100);
+ }
+
+ @Test
+ public void testCSVReaderDenseWide() {
+ runCSVReaderTest(false, 50, 12100);
+ }
+
+ @Test
+ public void testCSVReaderSparseWide() {
+ runCSVReaderTest(true, 500, 50000);
+ }
+
+ @Test
+ public void testCSVReaderDenseUltraWide() {
+ runCSVReaderTest(false, 50, 200000);
+ }
+
+ @Test
+ public void testCSVReaderDenseLarge() {
+ runCSVReaderTest(false, 750, 50000);
+ }
+
+ @Test
+ public void testCSVReaderSparseLarge() {
+ runCSVReaderTest(true, 500, 50000);
+ }
+
+ @Test
+ public void testCSVReaderDenseLarge2() {
+ runCSVReaderTest(false, 1200, 25000);
+ }
+
+ private void runCSVReaderTest(boolean sparse, int rows, int cols) {
+ Types.ExecMode platformOld =
setExecMode(Types.ExecMode.SINGLE_NODE);
+
+ try {
+ getAndLoadTestConfiguration(TEST_NAME1);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[] {"-explain", "-stats",
"-ooc", "-args", input(INPUT_NAME), output(OUTPUT_NAME)};
+
+ // 1. Generate the data in-memory as MatrixBlock objects
+ double[][] A_data = getRandomMatrix(rows, cols, 1,
maxVal, sparse ? sparsity2 : sparsity1, 7);
+
+ // 2. Convert the double arrays to MatrixBlock objects
+ MatrixBlock A_mb =
DataConverter.convertToMatrixBlock(A_data);
+
+ // 3. Create a binary matrix writer
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(Types.FileFormat.CSV);
+
+ // 4. Write matrix A to a binary SequenceFile
+ writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows,
cols, 1000, A_mb.getNonZeros());
+ HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"),
Types.ValueType.FP64,
+ new MatrixCharacteristics(rows, cols, 1000,
A_mb.getNonZeros()), Types.FileFormat.CSV);
+
+ runTest(true, false, null, -1);
+
+ //compare results
+
+ // rerun without ooc flag
+ programArgs = new String[] {"-explain", "-stats",
"-args", input(INPUT_NAME),
+ output(OUTPUT_NAME + "_target")};
+ runTest(true, false, null, -1);
+
+ // compare matrices
+ MatrixBlock ret1 =
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME), Types.FileFormat.BINARY,
rows, cols, 1000);
+ MatrixBlock ret2 =
DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"),
Types.FileFormat.BINARY, rows, cols, 1000);
+ TestUtils.compareMatrices(ret1, ret2, eps);
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ resetExecMode(platformOld);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/ooc/CSVReader.dml
b/src/test/scripts/functions/ooc/CSVReader.dml
new file mode 100644
index 0000000000..12e5b02cd0
--- /dev/null
+++ b/src/test/scripts/functions/ooc/CSVReader.dml
@@ -0,0 +1,23 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+A = read($1);
+
+write(A, $2, format="binary");