This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new ffce8cedaf [MINOR] Fix Reset on Reading Dense MatrixBlock
ffce8cedaf is described below
commit ffce8cedafffbd92acb288a2afa05ff0ce493426
Author: baunsgaard <[email protected]>
AuthorDate: Sun Mar 19 13:59:00 2023 +0100
[MINOR] Fix Reset on Reading Dense MatrixBlock
This commit improve the reading of dense matrix blocks by removing
an unnecessary reset that is called
On reading a 80GB file from HDFS:
Before: 25.197 sec After : 23.816 sec
The fix is simply that when reading a dense block we called
allocateDenseBlock(false), and based on the output of this we reset
the block. But if the block is already allocated by allocate Dense block
we do not need to reset it again afterwards (that would be redundant).
Closes #1791
---
.../org/apache/sysds/runtime/data/DenseBlock.java | 5 ++++
.../apache/sysds/runtime/data/DenseBlockBool.java | 10 ++++++++
.../apache/sysds/runtime/data/DenseBlockFP32.java | 25 +++++++++++++++++++
.../apache/sysds/runtime/data/DenseBlockFP64.java | 10 ++++++++
.../apache/sysds/runtime/data/DenseBlockInt32.java | 25 +++++++++++++++++++
.../apache/sysds/runtime/data/DenseBlockInt64.java | 26 +++++++++++++++++++
.../apache/sysds/runtime/data/DenseBlockLBool.java | 9 ++++---
.../apache/sysds/runtime/data/DenseBlockLDRB.java | 10 ++++++--
.../apache/sysds/runtime/data/DenseBlockLFP32.java | 12 ++++++---
.../sysds/runtime/data/DenseBlockString.java | 12 ++++++---
.../runtime/io/ReaderBinaryBlockParallel.java | 3 ++-
.../sysds/runtime/matrix/data/MatrixBlock.java | 29 ++++++++++++----------
.../tensor/DenseBlockConstIndexingTest.java | 21 +++++++++++-----
.../functions/async/LineageReuseSparkTest.java | 3 +++
14 files changed, 168 insertions(+), 32 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
index c0833ef8ac..e4a55ab10c 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
@@ -156,6 +156,11 @@ public abstract class DenseBlock implements Serializable,
Block
*/
public abstract void reset(int rlen, int[] odims, double v);
+ public final void resetNoFill(int rlen, int clen){
+ resetNoFill(rlen, new int[]{clen});
+ }
+
+ public abstract void resetNoFill(int rlen, int[] odims);
public static double estimateMemory(long nrows, long ncols){
long size = 16; // object
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockBool.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockBool.java
index 0c46c366fa..247526701a 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockBool.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockBool.java
@@ -86,6 +86,16 @@ public class DenseBlockBool extends DenseBlockDRB
_odims = odims;
}
+ @Override
+ public void resetNoFill(int rlen, int[] odims){
+ int len = rlen * odims[0];
+ if( len > capacity() )
+ _data = new BitSet(len);
+
+ _rlen = rlen;
+ _odims = odims;
+ }
+
@Override
public long capacity() {
return (_data!=null) ? _data.size() : -1;
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP32.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP32.java
index 1a3bc23400..f435efacbd 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP32.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP32.java
@@ -61,6 +61,31 @@ public class DenseBlockFP32 extends DenseBlockDRB
return ValueType.FP32 == vt;
}
+ @Override
+ public void reset(int rlen, int[] odims, double v) {
+ int len = rlen * odims[0];
+ if( len > capacity() ) {
+ _data = new float[len];
+ if( v != 0 )
+ Arrays.fill(_data,(float) v);
+ }
+ else {
+ Arrays.fill(_data, 0, len,(float) v);
+ }
+ _rlen = rlen;
+ _odims = odims;
+ }
+
+ @Override
+ public void resetNoFill(int rlen, int[] odims){
+ int len = rlen * odims[0];
+ if( len > capacity() )
+ _data = new float[len];
+
+ _rlen = rlen;
+ _odims = odims;
+ }
+
@Override
public long capacity() {
return (_data!=null) ? _data.length : -1;
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP64.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP64.java
index 916f315e03..44f8846ea9 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP64.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockFP64.java
@@ -71,6 +71,16 @@ public class DenseBlockFP64 extends DenseBlockDRB
_rlen = rlen;
_odims = odims;
}
+
+ @Override
+ public void resetNoFill(int rlen, int[] odims){
+ int len = rlen * odims[0];
+ if( len > capacity() )
+ _data = new double[len];
+
+ _rlen = rlen;
+ _odims = odims;
+ }
public static double estimateMemory(long nrows, long ncols) {
if( (double)nrows + ncols > Long.MAX_VALUE )
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt32.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt32.java
index 47e6a81700..428c0b8f88 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt32.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt32.java
@@ -61,6 +61,31 @@ public class DenseBlockInt32 extends DenseBlockDRB
return ValueType.INT32 == vt;
}
+ @Override
+ public void reset(int rlen, int[] odims, double v) {
+ int len = rlen * odims[0];
+ if(len > capacity()) {
+ _data = new int[len];
+ if(v != 0)
+ Arrays.fill(_data, (int) v);
+ }
+ else {
+ Arrays.fill(_data, 0, len, (int) v);
+ }
+ _rlen = rlen;
+ _odims = odims;
+ }
+
+ @Override
+ public void resetNoFill(int rlen, int[] odims) {
+ int len = rlen * odims[0];
+ if(len > capacity())
+ _data = new int[len];
+
+ _rlen = rlen;
+ _odims = odims;
+ }
+
@Override
public long capacity() {
return (_data!=null) ? _data.length : -1;
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt64.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt64.java
index 2be03705ca..d61d78e2b9 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt64.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockInt64.java
@@ -61,6 +61,32 @@ public class DenseBlockInt64 extends DenseBlockDRB
return ValueType.INT64 == vt;
}
+
+ @Override
+ public void reset(int rlen, int[] odims, double v) {
+ int len = rlen * odims[0];
+ if(len > capacity()) {
+ _data = new long[len];
+ if(v != 0)
+ Arrays.fill(_data, (long) v);
+ }
+ else {
+ Arrays.fill(_data, 0, len, (long) v);
+ }
+ _rlen = rlen;
+ _odims = odims;
+ }
+
+ @Override
+ public void resetNoFill(int rlen, int[] odims) {
+ int len = rlen * odims[0];
+ if(len > capacity())
+ _data = new long[len];
+
+ _rlen = rlen;
+ _odims = odims;
+ }
+
@Override
public long capacity() {
return (_data!=null) ? _data.length : -1;
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLBool.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLBool.java
index 1282d98bf2..e49b3a9b7c 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLBool.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLBool.java
@@ -20,14 +20,14 @@
package org.apache.sysds.runtime.data;
-import org.apache.sysds.common.Warnings;
+import java.util.BitSet;
+import java.util.stream.IntStream;
+
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.common.Warnings;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
-import java.util.BitSet;
-import java.util.stream.IntStream;
-
public class DenseBlockLBool extends DenseBlockLDRB
{
private static final long serialVersionUID = 2604223782138590322L;
@@ -64,6 +64,7 @@ public class DenseBlockLBool extends DenseBlockLDRB
return ValueType.BOOLEAN == vt;
}
+
@Override
public boolean isContiguous() {
return _blocks.length == 1;
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
index 7b23c4c4a9..f903a06655 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
@@ -21,10 +21,11 @@
package org.apache.sysds.runtime.data;
-import org.apache.sysds.runtime.util.UtilFunctions;
-
import java.util.stream.IntStream;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.runtime.util.UtilFunctions;
+
/**
* Dense Large Row Blocks have multiple 1D arrays (blocks), which contain
complete rows.
* Except the last block all blocks have the same size (size refers to the
number of rows contained and space allocated).
@@ -90,6 +91,11 @@ public abstract class DenseBlockLDRB extends DenseBlock
_odims = odims;
}
+ @Override
+ public void resetNoFill(int rlen, int[] odims){
+ throw new NotImplementedException();
+ }
+
@Override
public int pos(int[] ix) {
int pos = pos(ix[0]);
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLFP32.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLFP32.java
index 753c7a8ea7..a6d82218f2 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLFP32.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLFP32.java
@@ -19,13 +19,14 @@
package org.apache.sysds.runtime.data;
-import org.apache.sysds.common.Warnings;
+import java.util.Arrays;
+
+import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.common.Warnings;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
-import java.util.Arrays;
-
public class DenseBlockLFP32 extends DenseBlockLDRB
{
private static final long serialVersionUID = 2604223782138590322L;
@@ -62,6 +63,11 @@ public class DenseBlockLFP32 extends DenseBlockLDRB
return ValueType.FP32 == vt;
}
+ @Override
+ public void resetNoFill(int rlen, int[] odims){
+ throw new NotImplementedException();
+ }
+
@Override
public boolean isContiguous() {
return _blocks.length == 1;
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockString.java
b/src/main/java/org/apache/sysds/runtime/data/DenseBlockString.java
index 2bba6a759d..17688ca884 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockString.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockString.java
@@ -20,13 +20,14 @@
package org.apache.sysds.runtime.data;
-import org.apache.sysds.common.Warnings;
+import java.util.Arrays;
+
+import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.common.Warnings;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
-import java.util.Arrays;
-
public class DenseBlockString extends DenseBlockDRB {
private static final long serialVersionUID = 7071870563356352352L;
@@ -61,6 +62,11 @@ public class DenseBlockString extends DenseBlockDRB {
return false;
}
+ @Override
+ public void resetNoFill(int rlen, int[] odims){
+ throw new NotImplementedException();
+ }
+
@Override
public long capacity() {
return (_data != null) ? _data.length : -1;
diff --git
a/src/main/java/org/apache/sysds/runtime/io/ReaderBinaryBlockParallel.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderBinaryBlockParallel.java
index 96e010cdc4..55127ef2c4 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderBinaryBlockParallel.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
@@ -150,7 +151,7 @@ public class ReaderBinaryBlockParallel extends
ReaderBinaryBlock
long lnnz = 0; //aggregate block nnz
//directly read from sequence files (individual
partfiles)
- SequenceFile.Reader reader = new SequenceFile
+ final Reader reader = new SequenceFile
.Reader(_job, SequenceFile.Reader.file(_path));
try
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 923fecea8b..2ae883ee41 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -387,18 +387,21 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock<MatrixBlock>,
public boolean allocateDenseBlock(boolean clearNNZ) {
//allocate block if non-existing or too small (guaranteed to be
0-initialized),
long limit = (long)rlen * clen;
- boolean reset = (denseBlock == null || denseBlock.capacity() <
limit);
- if( denseBlock == null )
- denseBlock = DenseBlockFactory.createDenseBlock(rlen,
clen);
- else if( denseBlock.capacity() < limit )
- denseBlock.reset(rlen, clen);
-
//clear nnz if necessary
if( clearNNZ )
nonZeros = 0;
sparse = false;
+
+ if( denseBlock == null ){
+ denseBlock = DenseBlockFactory.createDenseBlock(rlen,
clen);
+ return true;
+ }
+ else if( denseBlock.capacity() < limit ){
+ denseBlock.reset(rlen, clen);
+ return true;
+ }
- return reset;
+ return false;
}
public final boolean allocateSparseRowsBlock() {
@@ -2012,13 +2015,13 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock<MatrixBlock>,
}
}
- private void readDenseBlock(DataInput in)
- throws IOException, DMLRuntimeException
- {
- if( !allocateDenseBlock(false) ) //allocate block
- denseBlock.reset(rlen, clen);
-
+ private void readDenseBlock(DataInput in) throws IOException,
DMLRuntimeException {
+ // allocate dense block resets the block if already allocated.
+ allocateDenseBlock(true);
DenseBlock a = getDenseBlock();
+ if(a.getDim(0) != rlen || a.getDim(1) != clen)
+ a.resetNoFill(rlen, clen); // reset the dimensions of a
if incorrect.
+
long nnz = 0;
if( in instanceof MatrixBlockDataInput ) { //fast deserialize
MatrixBlockDataInput mbin = (MatrixBlockDataInput)in;
diff --git
a/src/test/java/org/apache/sysds/test/component/tensor/DenseBlockConstIndexingTest.java
b/src/test/java/org/apache/sysds/test/component/tensor/DenseBlockConstIndexingTest.java
index 5806059cb4..05e011233d 100644
---
a/src/test/java/org/apache/sysds/test/component/tensor/DenseBlockConstIndexingTest.java
+++
b/src/test/java/org/apache/sysds/test/component/tensor/DenseBlockConstIndexingTest.java
@@ -19,18 +19,20 @@
package org.apache.sysds.test.component.tensor;
+import static org.junit.Assert.fail;
+
import org.apache.commons.lang.NotImplementedException;
-import org.junit.Assert;
-import org.junit.Test;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.DenseBlockFactory;
import org.apache.sysds.runtime.data.DenseBlockLBool;
import org.apache.sysds.runtime.data.DenseBlockLFP32;
import org.apache.sysds.runtime.data.DenseBlockLFP64;
-import org.apache.sysds.runtime.data.DenseBlockLString;
import org.apache.sysds.runtime.data.DenseBlockLInt32;
import org.apache.sysds.runtime.data.DenseBlockLInt64;
+import org.apache.sysds.runtime.data.DenseBlockLString;
+import org.junit.Assert;
+import org.junit.Test;
public class DenseBlockConstIndexingTest
{
@@ -81,9 +83,16 @@ public class DenseBlockConstIndexingTest
@Test
public void testIndexDenseBlock2StringConst() {
- DenseBlock db = getDenseBlock2(ValueType.STRING);
- db.set(new int[]{1,3}, "hello");
- Assert.assertEquals("hello", db.getString(new int[]{1,3}));
+ try{
+
+ DenseBlock db = getDenseBlock2(ValueType.STRING);
+ db.set(new int[]{1,3}, "hello");
+ Assert.assertEquals("hello", db.getString(new
int[]{1,3}));
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 9ed59d75d4..ad131069c4 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -74,6 +74,8 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
}
public void runTest(String testname, ExecMode execMode, int testId) {
+ setOutputBuffering(true);
+
boolean old_simplification =
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
boolean old_sum_product =
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
boolean old_trans_exec_type =
OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;
@@ -92,6 +94,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
//proArgs.add("-explain");
proArgs.add("-stats");
+ proArgs.add("-explain");
proArgs.add("-args");
proArgs.add(output("R"));
programArgs = proArgs.toArray(new
String[proArgs.size()]);