This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b6c076d  [SYSTEMDS-3147,3160,3161] Fix binary write block-size handling
b6c076d is described below

commit b6c076db6a812ce16f323fda4f0847660910c43f
Author: Matthias Boehm <[email protected]>
AuthorDate: Tue Oct 12 23:12:32 2021 +0200

    [SYSTEMDS-3147,3160,3161] Fix binary write block-size handling
    
    This patch fixes long standing issues with writing binary matrices in
    non-default block sizes for special cases such as forced singlenode
    execution (no reblocks), and direct writes of persistent reads.
    
    For example, reading the amazon books reviews dataset in binary format
    (blocksize 16K) and directly writing into sparse text in singlenode
    execution, took the 16K input object (never read), set the blocksize to
    -1 (for text), and the binary read (triggered on the persistent write)
    used a default block size of 1K (for robustness) and thus misplaced
    values or ran index out of bounds.
    
    Instead of modifying the metadata in-place before the write, or creating
    a shallow copy like in-memory reblock would do, we now properly manage
    this meta data during the write process itself and leave the input meta
    data untouched. In contrast to the in-memory reblock, this approach
    avoids unnecessary evictions of large input matrices.
---
 .../runtime/controlprogram/caching/CacheableData.java     | 15 +++++++--------
 .../runtime/controlprogram/caching/MatrixObject.java      |  5 +++--
 .../runtime/instructions/cp/VariableCPInstruction.java    | 12 +++++-------
 .../org/apache/sysds/runtime/io/FileFormatProperties.java | 13 +++++++++++++
 .../sysds/runtime/io/FileFormatPropertiesLIBSVM.java      |  3 ++-
 5 files changed, 30 insertions(+), 18 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index e44d061..773c92c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -799,10 +799,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                exportData(fName, outputFormat, -1, formatProperties);
        }
        
-       public synchronized void exportData (String fName, String outputFormat, 
int replication, FileFormatProperties formatProperties) {
-               exportData(fName, outputFormat, replication, formatProperties, 
null);
-       }
-       
        /**
         * Synchronized because there might be parallel threads (parfor local) 
that
         * access the same object (in case it was created before the loop).
@@ -818,9 +814,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * @param outputFormat format
         * @param replication ?
         * @param formatProperties file format properties
-        * @param opcode instruction opcode if available
         */
-       public synchronized void exportData (String fName, String outputFormat, 
int replication, FileFormatProperties formatProperties, String opcode) {
+       public synchronized void exportData (String fName, String outputFormat, 
int replication, FileFormatProperties formatProperties) {
                if( LOG.isTraceEnabled() )
                        LOG.trace("Export data "+hashCode()+" "+fName);
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -850,11 +845,13 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        setHDFSFileExists(true);
                
                //check for common file scheme (otherwise no copy/rename)
+               int blen = (formatProperties == null) ?
+                       ConfigurationManager.getBlocksize() : 
formatProperties.getBlocksize();
                boolean eqScheme = IOUtilFunctions.isSameFileScheme(
                        new Path(_hdfsFileName), new Path(fName));
                boolean eqFormat = isEqualOutputFormat(outputFormat);
-               boolean eqBlksize = (outputFormat == null || 
outputFormat.equals("binary"))
-                       && ConfigurationManager.getBlocksize() != 
getBlocksize();
+               boolean eqBlksize = (getBlocksize() != blen)
+                       && (outputFormat == null || 
outputFormat.equals("binary"));
                
                //actual export (note: no direct transfer of local copy in 
order to ensure blocking (and hence, parallelism))
                if( isDirty() || !eqScheme || isFederated() ||
@@ -1094,6 +1091,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                if ( fmt != FileFormat.MM ) {
                        // Get the dimension information from the metadata 
stored within MatrixObject
                        DataCharacteristics dc = iimd.getDataCharacteristics();
+                       if( formatProperties != null && 
formatProperties.knownBlocksize() )
+                               
dc.setBlocksize(formatProperties.getBlocksize());
                        
                        // when outputFormat is binaryblock, make sure that 
matrixCharacteristics has correct blocking dimensions
                        // note: this is only required if singlenode (due to 
binarycell default) 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 3194aa8..4da0675 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -419,6 +419,7 @@ public class MatrixObject extends 
CacheableData<MatrixBlock> {
                long clen = dims[1];
                MetaDataFormat iimd = (MetaDataFormat) _metaData;
                DataCharacteristics mc = iimd.getDataCharacteristics();
+               System.out.println(mc);
                long begin = 0;
 
                if(LOG.isTraceEnabled()) {
@@ -549,8 +550,8 @@ public class MatrixObject extends 
CacheableData<MatrixBlock> {
                        DataCharacteristics mc = iimd.getDataCharacteristics();
                        // Write the matrix to HDFS in requested format
                        FileFormat fmt = (ofmt != null ? 
FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());
-                       mc = (fmt == FileFormat.BINARY && mc.getBlocksize() > 
0) ? mc : new MatrixCharacteristics(mc)
-                               
.setBlocksize(ConfigurationManager.getBlocksize());
+                       if( fmt == FileFormat.BINARY && fprop != null )
+                               mc = new 
MatrixCharacteristics(mc).setBlocksize(fprop.getBlocksize());
                        DataConverter.writeMatrixToHDFS(_data, fname, fmt, mc, 
rep, fprop, _diag);
 
                        if(LOG.isTraceEnabled())
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index fdd7bf5..3965656 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -475,10 +475,10 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                                return new 
VariableCPInstruction(VariableOperationCode.CreateVariable,
                                        in1, in2, in3, iimd, updateType, 
fmtProperties, schema, opcode, str);
                        }
-
                        else {
                                return new 
VariableCPInstruction(VariableOperationCode.CreateVariable, in1, in2, in3, 
iimd, updateType, schema, opcode, str);
                        }
+                       
                case AssignVariable:
                        in1 = new CPOperand(parts[1]);
                        in2 = new CPOperand(parts[2]);
@@ -997,17 +997,15 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                                writeMMFile(ec, fname);
                        else if( fmt == FileFormat.CSV )
                                writeCSVFile(ec, fname);
-                       else if(fmt == FileFormat.LIBSVM)
-                       writeLIBSVMFile(ec, fname);
+                       else if(fmt == FileFormat.LIBSVM)
+                               writeLIBSVMFile(ec, fname);
                        else if(fmt == FileFormat.HDF5)
                                writeHDF5File(ec, fname);
                        else {
-                               // Default behavior
+                               // Default behavior (text, binary)
                                MatrixObject mo = 
ec.getMatrixObject(getInput1().getName());
                                int blen = 
Integer.parseInt(getInput4().getName());
-                               if( mo.getBlocksize() != blen )
-                                       
mo.getMetaData().getDataCharacteristics().setBlocksize(blen);
-                               mo.exportData(fname, fmtStr, _formatProperties);
+                               mo.exportData(fname, fmtStr, new 
FileFormatProperties(blen));
                        }
                        // Set privacy constraint of write instruction to the 
same as that of the input
                        
setPrivacyConstraint(ec.getMatrixObject(getInput1().getName()).getPrivacyConstraint());
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java 
b/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
index fe51f10..178f571 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FileFormatProperties.java
@@ -22,9 +22,22 @@ package org.apache.sysds.runtime.io;
 public class FileFormatProperties 
 {
        private String description;
+       private final int _blen;
        
        public FileFormatProperties() {
+               this(-1);
+       }
+       
+       public FileFormatProperties(int blen) {
+               _blen = blen;
+       }
+       
+       public int getBlocksize() {
+               return _blen;
+       }
        
+       public boolean knownBlocksize() {
+               return _blen != -1;
        }
 
        public String getDescription() {
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java 
b/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
index 7ebb455..05afda7 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FileFormatPropertiesLIBSVM.java
@@ -79,7 +79,8 @@ public class FileFormatPropertiesLIBSVM extends 
FileFormatProperties implements
                return sparse;
        }
 
-       @Override public String toString() {
+       @Override
+       public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(" delim " + delim);
                sb.append(" indexDelim " + indexDelim);

Reply via email to