This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b202f0f  [SYSTEMDS-3145] Spark Memory Estimation of Compressed Objects
b202f0f is described below

commit b202f0f1356d537fde7ef3989ca0c1675772f52c
Author: baunsgaard <[email protected]>
AuthorDate: Sat Sep 25 21:47:25 2021 +0200

    [SYSTEMDS-3145] Spark Memory Estimation of Compressed Objects
    
    This commit fixes a bug where the estimate of the compressed objects
    in spark was corrupted by the soft reference to the uncompressed matrix
    blocks. To address this, we clear the soft reference once compression is
    done in spark instructions.
    
    Also contained in this commit, is slight changes to logging when
    using a spark instruction to compress, if we enable trace on the spark
    trace instruction, compression information is printed. And reduction
    in printing from each compression instruction.
---
 .../runtime/compress/CompressedMatrixBlock.java    |   2 +-
 .../compress/CompressedMatrixBlockFactory.java     | 113 +++++++++++----------
 .../runtime/compress/CompressionSettings.java      |  23 ++---
 .../compress/CompressionSettingsBuilder.java       |  13 ++-
 .../estim/CompressedSizeEstimatorFactory.java      |  12 ++-
 .../context/SparkExecutionContext.java             |   1 +
 .../spark/CompressionSPInstruction.java            |  51 +++++++---
 7 files changed, 131 insertions(+), 84 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 4ff5bcb..c4eb8fa 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -434,7 +434,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
                        nonZeros = cg.getNumberNonZeros();
                        // clear the soft reference to the decompressed 
version, since the one column group is perfectly,
                        // representing the decompressed version.
-                       decompressedVersion = null;
+                       clearSoftReferenceToDecompressed();
                }
                // serialize compressed matrix block
                out.writeInt(rlen);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index d77ab82..a1d802a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -227,6 +227,11 @@ public class CompressedMatrixBlockFactory {
                if(res == null)
                        return abortCompression();
 
+               if(compSettings.isInSparkInstruction){
+                       // clear soft reference to uncompressed block in case 
of spark.
+                       res.clearSoftReferenceToDecompressed();
+               }
+
                return new ImmutablePair<>(res, _stats);
        }
 
@@ -476,58 +481,64 @@ public class CompressedMatrixBlockFactory {
                setNextTimePhase(time.stop());
                DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), 
phase);
                if(LOG.isDebugEnabled()) {
-                       switch(phase) {
-                               case 0:
-                                       LOG.debug("--compression phase " + 
phase + " Classify  : " + getLastTimePhase());
-                                       LOG.debug("--Individual Columns 
Estimated Compression: " + _stats.estimatedSizeCols);
-                                       break;
-                               case 1:
-                                       LOG.debug("--compression phase " + 
phase + " Grouping  : " + getLastTimePhase());
-                                       LOG.debug("Grouping using: " + 
compSettings.columnPartitioner);
-                                       LOG.debug("Cost Calculated using: " + 
costEstimator);
-                                       LOG.debug("--Cocoded Columns estimated 
Compression:" + _stats.estimatedSizeCoCoded);
-                                       break;
-                               case 2:
-                                       LOG.debug("--compression phase " + 
phase + " Transpose : " + getLastTimePhase());
-                                       LOG.debug("Did transpose: " + 
compSettings.transposed);
-                                       break;
-                               case 3:
-                                       LOG.debug("--compression phase " + 
phase + " Compress  : " + getLastTimePhase());
-                                       LOG.debug("--compression Hash 
collisions:" + "(" + DblArrayIntListHashMap.hashMissCount + ","
-                                               + 
DoubleCountHashMap.hashMissCount + ")");
-                                       DblArrayIntListHashMap.hashMissCount = 
0;
-                                       DoubleCountHashMap.hashMissCount = 0;
-                                       LOG.debug("--compressed initial actual 
size:" + _stats.compressedInitialSize);
-                                       break;
-                               case 4:
-                                       LOG.debug("--compression phase " + 
phase + " Share     : " + getLastTimePhase());
-                                       break;
-                               case 5:
-                                       LOG.debug("--num col groups: " + 
res.getColGroups().size());
-                                       LOG.debug("--compression phase " + 
phase + " Cleanup   : " + getLastTimePhase());
-                                       LOG.debug("--col groups types " + 
_stats.getGroupsTypesString());
-                                       LOG.debug("--col groups sizes " + 
_stats.getGroupsSizesString());
-                                       LOG.debug("--dense size:        " + 
_stats.denseSize);
-                                       LOG.debug("--original size:     " + 
_stats.originalSize);
-                                       LOG.debug("--compressed size:   " + 
_stats.size);
-                                       LOG.debug("--compression ratio: " + 
_stats.getRatio());
-                                       LOG.debug("--Dense       ratio: " + 
_stats.getDenseRatio());
-                                       int[] lengths = new 
int[res.getColGroups().size()];
-                                       int i = 0;
-                                       for(AColGroup colGroup : 
res.getColGroups())
-                                               lengths[i++] = 
colGroup.getNumValues();
-
-                                       LOG.debug("--compressed colGroup 
dictionary sizes: " + Arrays.toString(lengths));
-                                       if(LOG.isTraceEnabled()) {
-                                               for(AColGroup colGroup : 
res.getColGroups()) {
-                                                       LOG.trace("--colGroups 
type       : " + colGroup.getClass().getSimpleName() + " size: "
-                                                               + 
colGroup.estimateInMemorySize()
-                                                               + ((colGroup 
instanceof ColGroupValue) ? "  numValues :"
-                                                                       + 
((ColGroupValue) colGroup).getNumValues() : "")
-                                                               + "  colIndexes 
: " + Arrays.toString(colGroup.getColIndices()));
+                       if(compSettings.isInSparkInstruction) {
+                               if(phase == 5)
+                                       LOG.debug(_stats);
+                       }
+                       else {
+                               switch(phase) {
+                                       case 0:
+                                               LOG.debug("--compression phase 
" + phase + " Classify  : " + getLastTimePhase());
+                                               LOG.debug("--Individual Columns 
Estimated Compression: " + _stats.estimatedSizeCols);
+                                               break;
+                                       case 1:
+                                               LOG.debug("--compression phase 
" + phase + " Grouping  : " + getLastTimePhase());
+                                               LOG.debug("Grouping using: " + 
compSettings.columnPartitioner);
+                                               LOG.debug("Cost Calculated 
using: " + costEstimator);
+                                               LOG.debug("--Cocoded Columns 
estimated Compression:" + _stats.estimatedSizeCoCoded);
+                                               break;
+                                       case 2:
+                                               LOG.debug("--compression phase 
" + phase + " Transpose : " + getLastTimePhase());
+                                               LOG.debug("Did transpose: " + 
compSettings.transposed);
+                                               break;
+                                       case 3:
+                                               LOG.debug("--compression phase 
" + phase + " Compress  : " + getLastTimePhase());
+                                               LOG.debug("--compression Hash 
collisions:" + "(" + DblArrayIntListHashMap.hashMissCount + ","
+                                                       + 
DoubleCountHashMap.hashMissCount + ")");
+                                               
DblArrayIntListHashMap.hashMissCount = 0;
+                                               
DoubleCountHashMap.hashMissCount = 0;
+                                               LOG.debug("--compressed initial 
actual size:" + _stats.compressedInitialSize);
+                                               break;
+                                       case 4:
+                                               LOG.debug("--compression phase 
" + phase + " Share     : " + getLastTimePhase());
+                                               break;
+                                       case 5:
+                                               LOG.debug("--num col groups: " 
+ res.getColGroups().size());
+                                               LOG.debug("--compression phase 
" + phase + " Cleanup   : " + getLastTimePhase());
+                                               LOG.debug("--col groups types " 
+ _stats.getGroupsTypesString());
+                                               LOG.debug("--col groups sizes " 
+ _stats.getGroupsSizesString());
+                                               LOG.debug("--dense size:        
" + _stats.denseSize);
+                                               LOG.debug("--original size:     
" + _stats.originalSize);
+                                               LOG.debug("--compressed size:   
" + _stats.size);
+                                               LOG.debug("--compression ratio: 
" + _stats.getRatio());
+                                               LOG.debug("--Dense       ratio: 
" + _stats.getDenseRatio());
+                                               int[] lengths = new 
int[res.getColGroups().size()];
+                                               int i = 0;
+                                               for(AColGroup colGroup : 
res.getColGroups())
+                                                       lengths[i++] = 
colGroup.getNumValues();
+
+                                               LOG.debug("--compressed 
colGroup dictionary sizes: " + Arrays.toString(lengths));
+                                               if(LOG.isTraceEnabled()) {
+                                                       for(AColGroup colGroup 
: res.getColGroups()) {
+                                                               
LOG.trace("--colGroups type       : " + colGroup.getClass().getSimpleName() + " 
size: "
+                                                                       + 
colGroup.estimateInMemorySize()
+                                                                       + 
((colGroup instanceof ColGroupValue) ? "  numValues :"
+                                                                               
+ ((ColGroupValue) colGroup).getNumValues() : "")
+                                                                       + "  
colIndexes : " + Arrays.toString(colGroup.getColIndices()));
+                                                       }
                                                }
-                                       }
-                               default:
+                                       default:
+                               }
                        }
                }
                phase++;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index c1a9cd4..c871a64 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -87,14 +87,10 @@ public class CompressionSettings {
         */
        public final EnumSet<CompressionType> validCompressions;
 
-       /**
-        * The minimum size of the sample extracted.
-        */
+       /** The minimum size of the sample extracted. */
        public final int minimumSampleSize;
 
-       /**
-        * The maximum size of the sample extracted.
-        */
+       /** The maximum size of the sample extracted. */
        public final int maxSampleSize;
 
        /** The sample type used for sampling */
@@ -108,15 +104,17 @@ public class CompressionSettings {
         */
        public boolean transposed = false;
 
-       /**
-        * The minimum compression ratio to achieve.
-        */
+       /** The minimum compression ratio to achieve. */
        public final double minimumCompressionRatio;
 
+       /** Is a spark instruction */
+       public final boolean isInSparkInstruction;
+
        protected CompressionSettings(double samplingRatio, boolean 
allowSharedDictionary, String transposeInput, int seed,
                boolean lossy, EnumSet<CompressionType> validCompressions, 
boolean sortValuesByLength,
-               PartitionerType columnPartitioner, int maxColGroupCoCode, 
double coCodePercentage, int minimumSampleSize, int maxSampleSize,
-               EstimationType estimationType, CostType costComputationType, 
double minimumCompressionRatio) {
+               PartitionerType columnPartitioner, int maxColGroupCoCode, 
double coCodePercentage, int minimumSampleSize,
+               int maxSampleSize, EstimationType estimationType, CostType 
costComputationType, double minimumCompressionRatio,
+               boolean isInSparkInstruction) {
                this.samplingRatio = samplingRatio;
                this.allowSharedDictionary = allowSharedDictionary;
                this.transposeInput = transposeInput;
@@ -128,10 +126,11 @@ public class CompressionSettings {
                this.maxColGroupCoCode = maxColGroupCoCode;
                this.coCodePercentage = coCodePercentage;
                this.minimumSampleSize = minimumSampleSize;
-               this.maxSampleSize= maxSampleSize;
+               this.maxSampleSize = maxSampleSize;
                this.estimationType = estimationType;
                this.costComputationType = costComputationType;
                this.minimumCompressionRatio = minimumCompressionRatio;
+               this.isInSparkInstruction = isInSparkInstruction;
                if(LOG.isDebugEnabled())
                        LOG.debug(this);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index d5fd036..30bb1d1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -47,6 +47,7 @@ public class CompressionSettingsBuilder {
        private PartitionerType columnPartitioner;
        private CostType costType;
        private double minimumCompressionRatio = 1.0;
+       private boolean isInSparkInstruction = false;
 
        public CompressionSettingsBuilder() {
 
@@ -293,6 +294,16 @@ public class CompressionSettingsBuilder {
        }
 
        /**
+        * Inform the compression that it is executed in a spark instruction.
+        * 
+        * @return The CompressionSettingsBuilder
+        */
+       public CompressionSettingsBuilder setIsInSparkInstruction() {
+               this.isInSparkInstruction = true;
+               return this;
+       }
+
+       /**
         * Create the CompressionSettings object to use in the compression.
         * 
         * @return The CompressionSettings
@@ -300,6 +311,6 @@ public class CompressionSettingsBuilder {
        public CompressionSettings create() {
                return new CompressionSettings(samplingRatio, 
allowSharedDictionary, transposeInput, seed, lossy,
                        validCompressions, sortValuesByLength, 
columnPartitioner, maxColGroupCoCode, coCodePercentage,
-                       minimumSampleSize, maxSampleSize, estimationType, 
costType, minimumCompressionRatio);
+                       minimumSampleSize, maxSampleSize, estimationType, 
costType, minimumCompressionRatio, isInSparkInstruction);
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index a9a8e44..8eb2da0 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -43,16 +43,19 @@ public class CompressedSizeEstimatorFactory {
                else {
                        if(shouldUseExactEstimator(cs, nRows, sampleSize, 
nnzRows)) {
                                if(sampleSize > nnzRows && nRows > 10000 && 
nCols > 10 && !cs.transposed) {
-                                       LOG.info("Transposing for exact 
estimator");
+                                       if(! cs.isInSparkInstruction)
+                                               LOG.info("Transposing for exact 
estimator");
                                        data = LibMatrixReorg.transpose(data,
                                                new 
MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), 
k);
                                        cs.transposed = true;
                                }
-                               LOG.info("Using Exact estimator");
+                               if(! cs.isInSparkInstruction)
+                                       LOG.info("Using Exact estimator");
                                return new CompressedSizeEstimatorExact(data, 
cs);
                        }
                        else {
-                               LOG.info("Trying sample size: " + sampleSize);
+                               if(! cs.isInSparkInstruction)
+                                       LOG.info("Trying sample size: " + 
sampleSize);
                                return tryToMakeSampleEstimator(data, cs, 
sampleRatio, sampleSize, nRows, nnzRows, k);
                        }
                }
@@ -64,7 +67,8 @@ public class CompressedSizeEstimatorFactory {
                CompressedSizeEstimatorSample estS = new 
CompressedSizeEstimatorSample(data, cs, sampleSize, k);
                int double_number = 1;
                while(estS.getSample() == null) {
-                       LOG.warn("Doubling sample size " + double_number++);
+                       if(! cs.isInSparkInstruction)
+                               LOG.warn("Doubling sample size " + 
double_number++);
                        sampleSize = sampleSize * 2;
                        if(shouldUseExactEstimator(cs, nRows, sampleSize, 
nnzRows))
                                return new CompressedSizeEstimatorExact(data, 
cs);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 921219a..ca73700 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -269,6 +269,7 @@ public class SparkExecutionContext extends ExecutionContext
                final String threads = 
ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
                conf.setMaster("local[" + threads + "]");
                conf.setAppName("LocalSparkContextApp");
+               conf.set("spark.ui.showConsoleProgress", "false");
                conf.set("spark.ui.enabled", "false");
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
index e6b62ee..49f7ba5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
@@ -25,9 +25,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.RDDInfo;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.SingletonLookupHashMap;
 import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
+import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory.CostType;
 import org.apache.sysds.runtime.compress.cost.ICostEstimate;
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -87,9 +91,25 @@ public class CompressionSPInstruction extends 
UnarySPInstruction {
                // execute compression
                JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
in.mapValues(mappingFunction);
                if(LOG.isTraceEnabled()) {
-                       out.checkpoint();
-                       LOG.trace("\nSpark compressed    : " + 
reduceSizes(out.mapValues(new SizeFunction()).collect())
-                               + "\nSpark uncompressed  : " + 
reduceSizes(in.mapValues(new SizeFunction()).collect()));
+                       in.persist(StorageLevel.MEMORY_AND_DISK());
+                       out.persist(StorageLevel.MEMORY_AND_DISK());
+                       long sparkSizeIn = 0;
+                       long sparkSizeOut = 0;
+                       long blockSizesIn = reduceSizes(in.mapValues(new 
SizeFunction()).collect());
+                       long blockSizesOut = reduceSizes(out.mapValues(new 
SizeFunction()).collect());
+                       for(RDDInfo info : 
sec.getSparkContext().sc().getRDDStorageInfo()) {
+                               if(info.id() == out.id())
+                                       sparkSizeOut = info.memSize();
+                               else if(info.id() == in.id())
+                                       sparkSizeIn = info.memSize();
+                       }
+                       StringBuilder sb = new StringBuilder();
+                       sb.append("Spark Compression Instruction sizes:");
+                       sb.append(String.format("\nSBCompress: InSize:       
%16d", sparkSizeIn));
+                       sb.append(String.format("\nSBCompress: InBlockSize:  
%16d", blockSizesIn));
+                       sb.append(String.format("\nSBCompress: OutSize:      
%16d", sparkSizeOut));
+                       sb.append(String.format("\nSBCompress: OutBlockSize: 
%16d", blockSizesOut));
+                       LOG.trace(sb.toString());
                }
 
                // set outputs
@@ -102,7 +122,9 @@ public class CompressionSPInstruction extends 
UnarySPInstruction {
 
                @Override
                public MatrixBlock call(MatrixBlock arg0) throws Exception {
-                       return 
CompressedMatrixBlockFactory.compress(arg0).getLeft();
+                       CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setIsInSparkInstruction()
+                               .setCostType(CostType.MEMORY);
+                       return CompressedMatrixBlockFactory.compress(arg0, 
csb).getLeft();
                }
        }
 
@@ -117,13 +139,14 @@ public class CompressionSPInstruction extends 
UnarySPInstruction {
 
                @Override
                public MatrixBlock call(MatrixBlock arg0) throws Exception {
-                       ICostEstimate a = costBuilder.create(arg0.getNumRows(), 
arg0.getNumColumns());
-                       return CompressedMatrixBlockFactory.compress(arg0, 
InfrastructureAnalyzer.getLocalParallelism(), a)
+                       ICostEstimate ce = 
costBuilder.create(arg0.getNumRows(), arg0.getNumColumns());
+                       CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setIsInSparkInstruction();
+                       return CompressedMatrixBlockFactory.compress(arg0, 
InfrastructureAnalyzer.getLocalParallelism(), csb, ce)
                                .getLeft();
                }
        }
 
-       public static class SizeFunction implements Function<MatrixBlock, 
Double> {
+       public static class SizeFunction implements Function<MatrixBlock, Long> 
{
                private static final long serialVersionUID = 1L;
 
                public SizeFunction() {
@@ -131,17 +154,15 @@ public class CompressionSPInstruction extends 
UnarySPInstruction {
                }
 
                @Override
-               public Double call(MatrixBlock arg0) throws Exception {
-                       return (double) arg0.getInMemorySize();
+               public Long call(MatrixBlock arg0) throws Exception {
+                       return arg0.getInMemorySize();
                }
        }
 
-       public static String reduceSizes(List<Tuple2<MatrixIndexes, Double>> 
in) {
-               double sum = 0;
-               for(Tuple2<MatrixIndexes, Double> e : in) {
+       public static Long reduceSizes(List<Tuple2<MatrixIndexes, Long>> in) {
+               long sum = 0;
+               for(Tuple2<MatrixIndexes, Long> e : in)
                        sum += e._2();
-               }
-
-               return "sum: " + sum + " mean: " + (sum / in.size());
+               return sum;
        }
 }

Reply via email to