This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 67bb8034611a766ca93c0314ee735a3252e79290 Author: baunsgaard <[email protected]> AuthorDate: Mon Aug 30 16:28:31 2021 +0200 [SYSTEMDS-2610] CLA updates - Compressed matrix factory improvements - Add decompression if the data is serialized and larger in compressed format - Decompress on write to HDFS - Abort compression after cocode if the compression sizes are bad - Make c bind decompressing in workload tree (worst case) - Add a minimum compression ratio argument to the CompressionSettings - Reduce the sampling size in c bind compression and set high minimum compression ratio - Fix order of operations in compressed append - Add compressed output size to unary hops - More utilization of the cached decompressed matrix if it fits in memory by looking for soft reference of uncompressed in certain cases --- src/main/java/org/apache/sysds/hops/UnaryOp.java | 12 +- .../ipa/IPAPassCompressionWorkloadAnalysis.java | 1 + .../hops/rewrite/RewriteCompressedReblock.java | 2 +- .../runtime/compress/CompressedMatrixBlock.java | 29 ++- .../compress/CompressedMatrixBlockFactory.java | 98 +++++++- .../runtime/compress/CompressionSettings.java | 8 +- .../compress/CompressionSettingsBuilder.java | 8 +- .../runtime/compress/cocode/AColumnCoCoder.java | 22 +- .../runtime/compress/cocode/CoCodeBinPacking.java | 4 +- .../runtime/compress/cocode/CoCodeGreedy.java | 2 +- .../runtime/compress/cocode/CoCodePriorityQue.java | 4 +- .../compress/colgroup/ColGroupUncompressed.java | 19 +- .../compress/colgroup/mapping/MapToFactory.java | 12 +- .../compress/cost/CostEstimatorBuilder.java | 5 + .../compress/cost/CostEstimatorFactory.java | 2 +- .../compress/estim/CompressedSizeEstimator.java | 43 +++- .../estim/CompressedSizeEstimatorExact.java | 5 +- .../estim/CompressedSizeEstimatorFactory.java | 30 ++- .../estim/CompressedSizeEstimatorSample.java | 11 +- .../compress/estim/CompressedSizeInfoColGroup.java | 2 +- .../sysds/runtime/compress/lib/CLALibAppend.java | 32 +-- .../runtime/compress/lib/CLALibBinaryCellOp.java | 42 ++-- .../sysds/runtime/compress/lib/CLALibCompAgg.java | 51 ++-- .../runtime/compress/lib/CLALibRelationalOp.java | 267 --------------------- .../sysds/runtime/compress/lib/CLALibScalar.java | 7 - .../compress/workload/WorkloadAnalyzer.java | 5 +- .../context/SparkExecutionContext.java | 8 +- .../spark/BinUaggChainSPInstruction.java | 6 + .../component/compress/CompressedMatrixTest.java | 33 +++ .../component/compress/CompressedTestBase.java | 27 --- .../component/compress/workload/WorkloadTest.java | 18 +- .../compress/workload/WorkloadAlgorithmTest.java | 2 +- 32 files changed, 379 insertions(+), 438 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java index b7df277..38199b2 100644 --- a/src/main/java/org/apache/sysds/hops/UnaryOp.java +++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java @@ -545,7 +545,7 @@ public class UnaryOp extends MultiThreadedHop setDim2(1); } else if(_op == OpOp1.TYPEOF || _op == OpOp1.DETECTSCHEMA || _op == OpOp1.COLNAMES) { - //TODO theses three builtins should rather be moved to unary aggregates + //TODO these three builtins should rather be moved to unary aggregates setDim1(1); setDim2(input.getDim2()); } @@ -564,6 +564,16 @@ public class UnaryOp extends MultiThreadedHop { setNnz( input.getNnz() ); } + + // if the input is compressed then set the output to be compressed as well. + if(input._compressedOutput && ! (_op==OpOp1.DECOMPRESS)){ + setCompressedOutput(true); + // Setting the compressed output to be 2 x larger. + // Just in case we change the compressed structure slightly. + // this value is overwritten with correct size once the hop is executed + // TODO handle overlapping state, since some operations would not lead to compressed output. + setCompressedSize(input.compressedSize() * 2); + } } } diff --git a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java index 324b272..b23397f 100644 --- a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java +++ b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java @@ -58,6 +58,7 @@ public class IPAPassCompressionWorkloadAnalysis extends IPAPass { WTreeRoot tree = e.getValue(); CostEstimatorBuilder b = new CostEstimatorBuilder(tree); // filter out compression plans that is known bad + if(b.shouldTryToCompress()){ tree.getRoot().setRequiresCompression(tree); for(Hop h : tree.getDecompressList()) diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java index 6b51a51..2f0f1f6c 100644 --- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java +++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java @@ -128,7 +128,7 @@ public class RewriteCompressedReblock extends StatementBlockRewriteRule { public static boolean satisfiesSizeConstraintsForCompression(Hop hop) { if(hop.getDim2() >= 1) { - return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75; + return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75 || (hop.getSparsity() < 0.0001 && hop.getDim1() > 1000); } return false; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index d374d3c..5dcc406 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -302,7 +302,10 @@ public class CompressedMatrixBlock extends MatrixBlock { return ret; } - private MatrixBlock getCachedDecompressed() { + /** + * Get the cached decompressed matrix (if it exists otherwise null) + */ + public MatrixBlock getCachedDecompressed() { if(decompressedVersion != null) { final MatrixBlock mb = decompressedVersion.get(); if(mb != null) { @@ -453,6 +456,16 @@ public class CompressedMatrixBlock extends MatrixBlock { @Override public void write(DataOutput out) throws IOException { + if(getExactSizeOnDisk() > MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros)) { + // decompress and make a uncompressed column group. this is then used for the serialization, since it is + // smaller. + // throw new NotImplementedException("Decompressing serialization is not implemented"); + + MatrixBlock uncompressed = getUncompressed("Decompressing serialization for smaller serialization"); + ColGroupUncompressed cg = new ColGroupUncompressed(uncompressed); + allocateColGroup(cg); + nonZeros = cg.getNumberNonZeros(); + } // serialize compressed matrix block out.writeInt(rlen); out.writeInt(clen); @@ -492,11 +505,15 @@ public class CompressedMatrixBlock extends MatrixBlock { @Override public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue, MatrixValue result) { - return CLALibBinaryCellOp.binaryOperations(op, this, thatValue, result); + MatrixBlock that = thatValue == null ? null : (MatrixBlock) thatValue; + MatrixBlock ret = result == null ? null : (MatrixBlock) result; + return CLALibBinaryCellOp.binaryOperations(op, this, that, ret); } public MatrixBlock binaryOperationsLeft(BinaryOperator op, MatrixValue thatValue, MatrixValue result) { - return CLALibBinaryCellOp.binaryOperationsLeft(op, this, thatValue, result); + MatrixBlock that = thatValue == null ? null : (MatrixBlock) thatValue; + MatrixBlock ret = result == null ? null : (MatrixBlock) result; + return CLALibBinaryCellOp.binaryOperationsLeft(op, this, that, ret); } @Override @@ -686,8 +703,8 @@ public class CompressedMatrixBlock extends MatrixBlock { .aggregateUnaryOperations(op, result, blen, indexesIn, inCP); } - - return CLALibCompAgg.aggregateUnary(this, result, op, blen, indexesIn, inCP); + MatrixBlock ret = (result == null) ? null : (MatrixBlock) result; + return CLALibCompAgg.aggregateUnary(this, ret, op, blen, indexesIn, inCP); } @Override @@ -1080,7 +1097,7 @@ public class CompressedMatrixBlock extends MatrixBlock { boolean m2C = m2 instanceof CompressedMatrixBlock; boolean m3C = m3 instanceof CompressedMatrixBlock; printDecompressWarning("aggregateTernaryOperations " + op.aggOp.getClass().getSimpleName() + " " - + op.indexFn.getClass().getSimpleName() + " " + op.aggOp.increOp.fn.getClass().getSimpleName() + " " + + op.indexFn.getClass().getSimpleName() + " " + op.aggOp.increOp.fn.getClass().getSimpleName() + " " + op.binaryFn.getClass().getSimpleName() + " m1,m2,m3 " + m1C + " " + m2C + " " + m3C); MatrixBlock left = getUncompressed(); MatrixBlock right1 = getUncompressed(m2); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index 20beaf5..5f45d56 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -32,18 +32,21 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroupConst; import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed; import org.apache.sysds.runtime.compress.colgroup.ColGroupValue; import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory; import org.apache.sysds.runtime.compress.cost.ICostEstimate; -import org.apache.sysds.runtime.compress.cost.MemoryCostEstimator; import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator; import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap; import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap; import org.apache.sysds.runtime.compress.workload.WTreeRoot; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.utils.DMLCompressionStatistics; @@ -65,7 +68,7 @@ public class CompressedMatrixBlockFactory { private final CompressionSettings compSettings; /** The main cost estimator used for the compression */ private final ICostEstimate costEstimator; - + /** Time stamp of last phase */ private double lastPhase; /** Pointer to the original matrix Block that is about to be compressed. */ @@ -153,6 +156,20 @@ public class CompressedMatrixBlockFactory { } /** + * Generate a CompressedMatrixBlock Object that contains a single uncompressed matrix block column group. + * + * @param mb The matrix block to be contained in the uncompressed matrix block column, + * @return a CompressedMatrixBlock + */ + public static CompressedMatrixBlock genUncompressedCompressedMatrixBlock(MatrixBlock mb) { + CompressedMatrixBlock ret = new CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns()); + AColGroup cg = new ColGroupUncompressed(mb); + ret.allocateColGroup(cg); + ret.setNonZeros(mb.getNonZeros()); + return ret; + } + + /** * Method for constructing a compressed matrix out of an constant input. * * Since the input is a constant value it is trivially compressable, therefore we skip the entire compression @@ -191,9 +208,13 @@ public class CompressedMatrixBlockFactory { res = new CompressedMatrixBlock(mb); // copy metadata and allocate soft reference - classifyPhase(); - if(coCodeColGroups == null) - return abortCompression(); + looksLikeOneHot(); + + if(coCodeColGroups == null) { + classifyPhase(); + if(coCodeColGroups == null) + return abortCompression(); + } transposePhase(); compressPhase(); @@ -217,7 +238,13 @@ public class CompressedMatrixBlockFactory { _stats.estimatedSizeCols = sizeInfos.memoryEstimate(); logPhase(); - if(!(costEstimator instanceof MemoryCostEstimator) || _stats.estimatedSizeCols < _stats.originalSize) + final boolean isValidForComputeBasedCompression = isComputeBasedCompression() && + (compSettings.minimumCompressionRatio != 1.0) ? _stats.estimatedSizeCols * + compSettings.minimumCompressionRatio < _stats.originalSize : true; + final boolean isValidForMemoryBasedCompression = _stats.estimatedSizeCols * + compSettings.minimumCompressionRatio < _stats.originalSize; + + if(isValidForComputeBasedCompression || isValidForMemoryBasedCompression) coCodePhase(sizeEstimator, sizeInfos, costEstimator); else { LOG.info("Estimated Size of singleColGroups: " + _stats.estimatedSizeCols); @@ -225,13 +252,72 @@ public class CompressedMatrixBlockFactory { } } + private boolean isComputeBasedCompression() { + return costEstimator instanceof ComputationCostEstimator; + } + private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos, ICostEstimate costEstimator) { coCodeColGroups = CoCoderFactory.findCoCodesByPartitioning(sizeEstimator, sizeInfos, k, costEstimator, compSettings); _stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate(); + logPhase(); + + // if cocode is estimated larger than uncompressed abort compression. + if(isComputeBasedCompression() && + _stats.estimatedSizeCoCoded * compSettings.minimumCompressionRatio > _stats.originalSize) { + + coCodeColGroups = null; + LOG.info("Aborting compression because the cocoded size : " + _stats.estimatedSizeCoCoded); + LOG.info("Vs original size : " + _stats.originalSize); + } + + } + + private void looksLikeOneHot() { + final int numColumns = mb.getNumColumns(); + final int numRows = mb.getNumRows(); + final long nnz = mb.getNonZeros(); + final int colGroupSize = 100; + if(nnz == numRows) { + boolean onlyOneValues = true; + LOG.debug("Looks like one hot encoded."); + if(mb.isInSparseFormat()) { + final SparseBlock sb = mb.getSparseBlock(); + for(double v : sb.get(0).values()) { + onlyOneValues = v == 1.0; + if(!onlyOneValues) { + break; + } + } + } + else { + final double[] vals = mb.getDenseBlock().values(0); + for(int i = 0; i < Math.min(vals.length, 1000); i++) { + double v = vals[i]; + onlyOneValues = v == 1.0 || v == 0.0; + if(!onlyOneValues) { + break; + } + } + } + if(onlyOneValues) { + List<CompressedSizeInfoColGroup> ng = new ArrayList<>(numColumns / colGroupSize + 1); + for(int i = 0; i < numColumns; i += colGroupSize) { + int[] columnIds = new int[Math.min(colGroupSize, numColumns - i)]; + for(int j = 0; j < columnIds.length; j++) + columnIds[j] = i + j; + ng.add(new CompressedSizeInfoColGroup(columnIds, Math.min(numColumns, colGroupSize), numRows)); + } + coCodeColGroups = new CompressedSizeInfo(ng); + + LOG.debug("Concluded that it probably is one hot encoded skipping analysis"); + // skipping two phases + phase += 2; + } + } } private void transposePhase() { diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java index 6dfcdae..ea04e8e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java @@ -103,10 +103,15 @@ public class CompressionSettings { */ public boolean transposed = false; + /** + * The minimum compression ratio to achieve. + */ + public final double minimumCompressionRatio; + protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput, int seed, boolean lossy, EnumSet<CompressionType> validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, - int minimumSampleSize, EstimationType estimationType, CostType costComputationType) { + int minimumSampleSize, EstimationType estimationType, CostType costComputationType, double minimumCompressionRatio) { this.samplingRatio = samplingRatio; this.allowSharedDictionary = allowSharedDictionary; this.transposeInput = transposeInput; @@ -120,6 +125,7 @@ public class CompressionSettings { this.minimumSampleSize = minimumSampleSize; this.estimationType = estimationType; this.costComputationType = costComputationType; + this.minimumCompressionRatio = minimumCompressionRatio; if(LOG.isDebugEnabled()) LOG.debug(this); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java index a56285b..2864118 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java @@ -45,6 +45,7 @@ public class CompressionSettingsBuilder { private EstimationType estimationType = EstimationType.HassAndStokes; private PartitionerType columnPartitioner; private CostType costType; + private double minimumCompressionRatio = 1.0; public CompressionSettingsBuilder() { @@ -267,6 +268,11 @@ public class CompressionSettingsBuilder { return this; } + public CompressionSettingsBuilder setMinimumCompressionRatio(double ratio){ + this.minimumCompressionRatio = ratio; + return this; + } + /** * Create the CompressionSettings object to use in the compression. * @@ -275,6 +281,6 @@ public class CompressionSettingsBuilder { public CompressionSettings create() { return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, seed, lossy, validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage, - minimumSampleSize, estimationType, costType); + minimumSampleSize, estimationType, costType,minimumCompressionRatio); } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java index 41d986c..3bdcf5d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java @@ -26,7 +26,6 @@ import org.apache.sysds.runtime.compress.cost.ICostEstimate; import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; -import org.apache.sysds.runtime.compress.utils.Util; public abstract class AColumnCoCoder { @@ -52,26 +51,17 @@ public abstract class AColumnCoCoder { */ protected abstract CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k); - protected CompressedSizeInfoColGroup join(CompressedSizeInfoColGroup lhs, CompressedSizeInfoColGroup rhs, - boolean analyze) { - return analyze ? joinWithAnalysis(lhs, rhs) : joinWithoutAnalysis(lhs, rhs); + protected CompressedSizeInfoColGroup join(int[] joined, CompressedSizeInfoColGroup lhs, + CompressedSizeInfoColGroup rhs, boolean analyze) { + return analyze ? _sest.estimateJoinCompressedSize(joined, lhs, rhs) : joinWithoutAnalysis(joined, lhs, rhs); } - protected CompressedSizeInfoColGroup joinWithAnalysis(CompressedSizeInfoColGroup lhs, + protected CompressedSizeInfoColGroup joinWithoutAnalysis(int[] joined, CompressedSizeInfoColGroup lhs, CompressedSizeInfoColGroup rhs) { - return _sest.estimateJoinCompressedSize(lhs, rhs); - } - - protected CompressedSizeInfoColGroup joinWithoutAnalysis(CompressedSizeInfoColGroup lhs, - CompressedSizeInfoColGroup rhs) { - int[] joined = Util.join(lhs.getColumns(), rhs.getColumns()); final int lhsV = lhs.getNumVals(); final int rhsV = rhs.getNumVals(); - final int numVals = lhsV * rhsV; - if(numVals < 0 || numVals > _sest.getNumRows()) - return null; - else - return new CompressedSizeInfoColGroup(joined, numVals, _sest.getNumRows()); + final int joinedMaxDistinct = (int) Math.min((long) lhsV * (long) rhsV, (long) _sest.getNumRows()); + return new CompressedSizeInfoColGroup(joined, joinedMaxDistinct, _sest.getNumRows()); } protected CompressedSizeInfoColGroup analyze(CompressedSizeInfoColGroup g) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java index 1887bdc..1731ef2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java @@ -141,7 +141,7 @@ public class CoCodeBinPacking extends AColumnCoCoder { for(int j = 0; j < bins.size(); j++) { double newBinWeight = binWeights[j] - c.getCardinalityRatio(); if(newBinWeight >= 0 && bins.get(j).getColumns().length < MAX_COL_PER_GROUP - 1) { - bins.set(j, joinWithoutAnalysis(bins.get(j), c)); + bins.set(j, joinWithoutAnalysis(Util.join(bins.get(j).getColumns(), c.getColumns()),bins.get(j), c)); binWeights[j] = newBinWeight; assigned = true; break; @@ -291,7 +291,7 @@ public class CoCodeBinPacking extends AColumnCoCoder { g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions); else { st3++; - g = _sest.estimateJoinCompressedSize(left, right); + g = _sest.estimateJoinCompressedSize(c, left, right); } if(leftConst || rightConst) diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java index 51bfa46..3091ba0 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java @@ -163,7 +163,7 @@ public class CoCodeGreedy extends AColumnCoCoder { g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions); else { st3++; - g = _sest.estimateJoinCompressedSize(left, right); + g = _sest.estimateJoinCompressedSize(c, left, right); } if(leftConst || rightConst) diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java index 2e0e7fb..27b678c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java @@ -68,7 +68,7 @@ public class CoCodePriorityQue extends AColumnCoCoder { while(que.peek() != null) { CompressedSizeInfoColGroup r = que.poll(); - final CompressedSizeInfoColGroup g = joinWithAnalysis(l, r); + final CompressedSizeInfoColGroup g = _sest.estimateJoinCompressedSize(l, r); if(g != null) { final double costOfJoin = _cest.getCostOfCollectionOfGroups(que, g); if(costOfJoin < costBeforeJoin) { @@ -93,7 +93,7 @@ public class CoCodePriorityQue extends AColumnCoCoder { while(que.peek() != null) { CompressedSizeInfoColGroup r = que.peek(); if(_cest.shouldTryJoin(l, r)) { - CompressedSizeInfoColGroup g = joinWithAnalysis(l, r); + CompressedSizeInfoColGroup g = _sest.estimateJoinCompressedSize(l, r); if(g != null) { double costOfJoin = _cest.getCostOfColumnGroup(g); double costIndividual = _cest.getCostOfColumnGroup(l) + _cest.getCostOfColumnGroup(r); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java index ab94646..b1b0540 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java @@ -132,6 +132,23 @@ public class ColGroupUncompressed extends AColGroup { _data = data; } + /** + * Constructor for allocating a single uncompressed column group. + * + * @param data matrix block + */ + public ColGroupUncompressed(MatrixBlock data) { + super(generateColumnList(data.getNumColumns())); + _data = data; + } + + private static int[] generateColumnList(int nCol){ + int[] cols = new int[nCol]; + for(int i = 0; i< nCol; i++) + cols[i] = i; + return cols; + } + @Override public CompressionType getCompType() { return CompressionType.UNCOMPRESSED; @@ -632,7 +649,7 @@ public class ColGroupUncompressed extends AColGroup { double[] dv = colSum.getDenseBlockValues(); for(int i = 0; i < _colIndexes.length; i++) c[_colIndexes[i]] += dv[i]; - + } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java index 5a72ff7..6c0b9fa 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java @@ -108,16 +108,13 @@ public final class MapToFactory { final int nVL = left.getUnique(); final int nVR = right.getUnique(); final int size = left.size(); - final int maxUnique = nVL * nVR; + final long maxUnique = nVL * nVR; + if(maxUnique > (long)Integer.MAX_VALUE) + throw new DMLCompressionException("Joining impossible using linearized join, since each side has a large number of unique values"); if(size != right.size()) throw new DMLCompressionException("Invalid input maps to join, must contain same number of rows"); - try { - return computeJoin(left, right, size, nVL, maxUnique); - } - catch(Exception e) { - throw new DMLCompressionException("Joining failed max unique expected:" + maxUnique, e); - } + return computeJoin(left, right, size, nVL, (int)maxUnique); } private static AMapToData computeJoin(AMapToData left, AMapToData right, int size, int nVL, int maxUnique) { @@ -141,7 +138,6 @@ public final class MapToFactory { } tmp.setUnique(newUID-1); - // LOG.error(Arrays.toString(map)); return tmp; } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java index d7f305c..3d58bff 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java @@ -125,6 +125,11 @@ public final class CostEstimatorBuilder implements Serializable { numberOps += counter.scans + counter.leftMultiplications * 2 + counter.rightMultiplications + counter.compressedMultiplications * 4 + counter.dictionaryOps; numberOps -= counter.decompressions + counter.overlappingDecompressions; + + if(counter.decompressions > 1 && + counter.leftMultiplications + counter.rightMultiplications + counter.compressedMultiplications < 1) + // This condition is added for l2svm and mLogReg y dataset, that is compressing while it should not. + return false; return numberOps > 4; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java index e8bff06..83b794e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java @@ -45,7 +45,7 @@ public final class CostEstimatorFactory { return b.create(nRows, nCols); } else - return new DistinctCostEstimator(nRows, cs); + return new MemoryCostEstimator(); case MEMORY: default: return new MemoryCostEstimator(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java index 3dd092c..ef7d9dd 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java @@ -82,7 +82,7 @@ public abstract class CompressedSizeEstimator { return _numCols; } - public MatrixBlock getData(){ + public MatrixBlock getData() { return _data; } @@ -163,7 +163,7 @@ public abstract class CompressedSizeEstimator { * @param colIndexes The columns to group together inside a ColGroup * @return The CompressedSizeInformation associated with the selected ColGroups. */ - public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes){ + public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) { return estimateCompressedColGroupSize(colIndexes, 8, getNumRows()); } @@ -173,7 +173,7 @@ public abstract class CompressedSizeEstimator { * the number estimated in sub groups of the given colIndexes. * * @param colIndexes The columns to extract compression information from - * @param estimate An estimate of number of unique elements in these columns + * @param estimate An estimate of number of unique elements in these columns * @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the * number of unique elements estimated in sub columns multiplied together. This is * flexible in the sense that if the sample is small then this unique can be manually @@ -181,13 +181,16 @@ public abstract class CompressedSizeEstimator { * * @return The CompressedSizeInfoColGroup fro the given column indexes. */ - public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate, int nrUniqueUpperBound); + public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate, + int nrUniqueUpperBound); /** * Join two analyzed column groups together. without materializing the dictionaries of either side. * - * If either side was constructed without analysis then fall back to default materialization of double arrays. + * if the number of distinct elements in both sides multiplied is larger than Integer, return null. * + * If either side was constructed without analysis then fall back to default materialization of double arrays. + * O * @param g1 First group * @param g2 Second group * @return A joined compressed size estimation for the group. @@ -195,19 +198,39 @@ public abstract class CompressedSizeEstimator { public CompressedSizeInfoColGroup estimateJoinCompressedSize(CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2) { final int[] joined = Util.join(g1.getColumns(), g2.getColumns()); + return estimateJoinCompressedSize(joined, g1, g2); + } + + /** + * Join two analyzed column groups together. without materializing the dictionaries of either side. + * + * if the number of distinct elements in both sides multiplied is larger than Integer, return null. + * + * If either side was constructed without analysis then fall back to default materialization of double arrays. + * + * @param joined The joined column indexes. + * @param g1 First group + * @param g2 Second group + * @return A joined compressed size estimation for the group. + */ + public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, + CompressedSizeInfoColGroup g2) { final int g1V = g1.getNumVals(); final int g2V = g2.getNumVals(); - if(g1V * g2V < 0 || g1V * g2V > getNumRows()) + if((long) g1V * g2V > (long) Integer.MAX_VALUE) return null; - else if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && g2V != 0)) - return estimateCompressedColGroupSize(joined, Math.max(g1V + 1, g2V+ 1), Math.min((g1V + 1) * (g2V + 1), getNumRows())); + + final int joinedMaxDistinct = (int) Math.min((long) g1V * (long) g2V, getNumRows()); + if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && g2V != 0)) + return estimateCompressedColGroupSize(joined, Math.max(g1V + 1, g2V + 1), + Math.min((g1V + 1) * (g2V + 1), getNumRows())); else - return estimateJoinCompressedSize(joined, g1, g2); + return estimateJoinCompressedSize(joined, g1, g2, joinedMaxDistinct); } protected abstract CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joinedcols, - CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2); + CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int joinedMaxDistinct); /** * Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java index 824034a..1531781 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java @@ -45,8 +45,9 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator { } @Override - public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, - CompressedSizeInfoColGroup g2) { + protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, + CompressedSizeInfoColGroup g2, int joinedMaxDistinct) { + AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap()); EstimationFactors em = EstimationFactors.computeSizeEstimation(joined, map, _cs.validCompressions.contains(CompressionType.RLE), _numRows, false); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java index 8b42258..6e9208a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java @@ -31,22 +31,31 @@ public class CompressedSizeEstimatorFactory { public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings cs, int k) { - final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows(); + final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows(); final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns(); final int nnzRows = (int) Math.ceil(data.getNonZeros() / nCols); final double sampleRatio = cs.samplingRatio; final int sampleSize = Math.min(getSampleSize(sampleRatio, nRows, cs.minimumSampleSize), maxSampleSize); - if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) { - if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) { - data = LibMatrixReorg.transpose(data, - new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k); - cs.transposed = true; - } - return new CompressedSizeEstimatorExact(data, cs); + + if(nCols > 1000) { + return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize / 10, nRows, nnzRows, k); } else { - return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k); + if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) { + if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) { + LOG.info("Transposing for exact estimator"); + data = LibMatrixReorg.transpose(data, + new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k); + cs.transposed = true; + } + LOG.info("Using Exact estimator"); + return new CompressedSizeEstimatorExact(data, cs); + } + else { + LOG.info("Trying sample size: " + sampleSize); + return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k); + } } } @@ -54,8 +63,9 @@ public class CompressedSizeEstimatorFactory { private static CompressedSizeEstimator tryToMakeSampleEstimator(MatrixBlock data, CompressionSettings cs, double sampleRatio, int sampleSize, int nRows, int nnzRows, int k) { CompressedSizeEstimatorSample estS = new CompressedSizeEstimatorSample(data, cs, sampleSize, k); + int double_number = 1; while(estS.getSample() == null) { - LOG.warn("Doubling sample size"); + LOG.error("Warining doubling sample size " + double_number++); sampleSize = sampleSize * 2; if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) return new CompressedSizeEstimatorExact(data, cs); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java index 24ee358..218e12b 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java @@ -107,18 +107,17 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { } @Override - public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, - CompressedSizeInfoColGroup g2) { - final int g1V = g1.getMap().getUnique(); - final int g2V = g2.getMap().getUnique(); - final int nrUniqueUpperBound = g1V * g2V; + protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, + CompressedSizeInfoColGroup g2, int joinedMaxDistinct) { + if((long)g1.getNumVals() * g2.getNumVals() >(long)Integer.MAX_VALUE ) + return null; final AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap()); EstimationFactors sampleFacts = EstimationFactors.computeSizeEstimation(joined, map, _cs.validCompressions.contains(CompressionType.RLE), map.size(), false); // result facts - EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, nrUniqueUpperBound); + EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, joinedMaxDistinct); return new CompressedSizeInfoColGroup(em, _cs.validCompressions, map); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java index e2b7491..1026aa2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java @@ -66,7 +66,7 @@ public class CompressedSizeInfoColGroup { _facts = new EstimationFactors(columns, numVals, numRows); _cardinalityRatio = (double) numVals / numRows; _sizes = null; - _bestCompressionType = null; + _bestCompressionType = CompressionType.DDC; _minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, 1.0, false); _map = null; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java index fc086e6..ab60d9f 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java @@ -22,12 +22,10 @@ package org.apache.sysds.runtime.compress.lib; import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; -import org.apache.sysds.runtime.compress.CompressionStatistics; import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -41,24 +39,18 @@ public class CLALibAppend { if(left.isEmpty() && right instanceof CompressedMatrixBlock) return appendLeftEmpty(left, (CompressedMatrixBlock) right); else if(right.isEmpty() && left instanceof CompressedMatrixBlock) - return appendRightEmpty((CompressedMatrixBlock)left, right); + return appendRightEmpty((CompressedMatrixBlock) left, right); final int m = left.getNumRows(); final int n = left.getNumColumns() + right.getNumColumns(); - // try to compress both sides (if not already compressed). if(!(left instanceof CompressedMatrixBlock) && m > 1000) { - LOG.warn("Compressing left for append operation"); - Pair<MatrixBlock, CompressionStatistics> x = CompressedMatrixBlockFactory.compress(left); - if(x.getRight().getRatio() > 3.0) - left = x.getLeft(); - + LOG.info("Appending uncompressed column group left"); + left = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(left); } if(!(right instanceof CompressedMatrixBlock) && m > 1000) { - LOG.warn("Compressing right for append operation"); - Pair<MatrixBlock, CompressionStatistics> x = CompressedMatrixBlockFactory.compress(right); - if(x.getRight().getRatio() > 3.0) - right = x.getLeft(); + LOG.warn("Appending uncompressed column group right"); + left = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(right); } // if compression failed then use default append method. @@ -72,14 +64,22 @@ public class CLALibAppend { CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n); ret = appendColGroups(ret, leftC.getColGroups(), rightC.getColGroups(), leftC.getNumColumns()); - return ret; + + double compressedSize = ret.getInMemorySize(); + double uncompressedSize = MatrixBlock.estimateSizeInMemory(m,n, ret.getSparsity()); + + + if(compressedSize * 10 < uncompressedSize) + return ret; + else + return ret.getUncompressed("Decompressing c bind matrix"); } private static MatrixBlock appendRightEmpty(CompressedMatrixBlock left, MatrixBlock right) { final int m = left.getNumRows(); final int n = left.getNumColumns() + right.getNumColumns(); - CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n); + CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n); List<AColGroup> newGroup = new ArrayList<>(1); newGroup.add(ColGroupEmpty.generate(right.getNumColumns(), right.getNumRows())); @@ -91,7 +91,7 @@ public class CLALibAppend { private static MatrixBlock appendLeftEmpty(MatrixBlock left, CompressedMatrixBlock right) { final int m = left.getNumRows(); final int n = left.getNumColumns() + right.getNumColumns(); - CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n); + CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n); List<AColGroup> newGroup = new ArrayList<>(1); newGroup.add(ColGroupEmpty.generate(left.getNumColumns(), left.getNumRows())); diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java index 79cf1c3..99d9c92 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java @@ -19,7 +19,6 @@ package org.apache.sysds.runtime.compress.lib; -import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -59,33 +58,41 @@ public class CLALibBinaryCellOp { private static final Log LOG = LogFactory.getLog(CLALibBinaryCellOp.class.getName()); - public static MatrixBlock binaryOperations(BinaryOperator op, CompressedMatrixBlock m1, MatrixValue thatValue, - MatrixValue result) { + public static MatrixBlock binaryOperations(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock thatValue, + MatrixBlock result) { MatrixBlock that = CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing right side in BinaryOps"); if(m1.getNumRows() <= 0) LOG.error(m1); - if(thatValue.getNumRows() <= 0) - LOG.error(thatValue); + if(that.getNumRows() <= 0) + LOG.error(that); LibMatrixBincell.isValidDimensionsBinary(m1, that); - thatValue = that; BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(m1, that); - return selectProcessingBasedOnAccessType(op, m1, thatValue, result, atype, false); + return selectProcessingBasedOnAccessType(op, m1, that, result, atype, false); } - public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatrixBlock m1, MatrixValue thatValue, - MatrixValue result) { + public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock thatValue, + MatrixBlock result) { MatrixBlock that = CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing left side in BinaryOps"); LibMatrixBincell.isValidDimensionsBinary(that, m1); thatValue = that; BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(that, m1); - return selectProcessingBasedOnAccessType(op, m1, thatValue, result, atype, true); + return selectProcessingBasedOnAccessType(op, m1, that, result, atype, true); } private static MatrixBlock selectProcessingBasedOnAccessType(BinaryOperator op, CompressedMatrixBlock m1, - MatrixValue thatValue, MatrixValue result, BinaryAccessType atype, boolean left) { - MatrixBlock that = (MatrixBlock) thatValue; - if(atype == BinaryAccessType.MATRIX_COL_VECTOR) - return binaryMVCol(m1, that, op, left); + MatrixBlock that, MatrixBlock result, BinaryAccessType atype, boolean left) { + if(atype == BinaryAccessType.MATRIX_COL_VECTOR) { + MatrixBlock d_compressed = m1.getCachedDecompressed(); + if(d_compressed != null) { + if(left) + return that.binaryOperations(op, d_compressed, result); + else + return d_compressed.binaryOperations(op, that, result); + } + else + return binaryMVCol(m1, that, op, left); + + } else if(atype == BinaryAccessType.MATRIX_MATRIX) { if(that.isEmpty()) { ScalarOperator sop = left ? new LeftScalarOperator(op.fn, 0, -1) : new RightScalarOperator(op.fn, 0, @@ -93,8 +100,7 @@ public class CLALibBinaryCellOp { return CLALibScalar.scalarOperations(sop, m1, result); } else { - SoftReference<MatrixBlock> msf = m1.getSoftReferenceToDecompressed(); - MatrixBlock d_compressed = msf != null ? msf.get() : null; + MatrixBlock d_compressed = m1.getCachedDecompressed(); if(d_compressed != null) { // copy the decompressed matrix if there is a decompressed matrix already. MatrixBlock tmp = d_compressed; @@ -117,7 +123,7 @@ public class CLALibBinaryCellOp { return bincellOp(m1, that, setupCompressedReturnMatrixBlock(m1, result), op, left); else { LOG.warn("Decompressing since Binary Ops" + op.fn + " is not supported compressed"); - return CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, thatValue, result); + return CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, that, result); } } @@ -295,7 +301,7 @@ public class CLALibBinaryCellOp { final int blkz = CompressionSettings.BITMAP_BLOCK_SZ; final int k = op.getNumThreads(); long nnz = 0; - ; + if(k <= 1) { for(int i = 0; i * blkz < m1.getNumRows(); i++) { if(left) diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java index 892ec95..5c9f7b2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java @@ -34,6 +34,7 @@ import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressionSettings; import org.apache.sysds.runtime.compress.DMLCompressionException; import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysds.runtime.functionobjects.Builtin; import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode; import org.apache.sysds.runtime.functionobjects.IndexFunction; @@ -50,7 +51,6 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixAgg; import org.apache.sysds.runtime.matrix.data.LibMatrixBincell; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; -import org.apache.sysds.runtime.matrix.data.MatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; @@ -71,43 +71,64 @@ public class CLALibCompAgg { } }; - public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixValue result, + public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixBlock result, AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean inCP) { // prepare output dimensions CellIndex tempCellIndex = new CellIndex(-1, -1); op.indexFn.computeDimension(inputMatrix.getNumRows(), inputMatrix.getNumColumns(), tempCellIndex); + if(requireDecompression(inputMatrix, op)) { + // Decide if we should use the cached decompressed Version, or we should decompress. + final double denseSize = MatrixBlock.estimateSizeDenseInMemory(inputMatrix.getNumRows(), + inputMatrix.getNumColumns()); + final double currentSize = inputMatrix.getInMemorySize(); + final double localMaxMemory = InfrastructureAnalyzer.getLocalMaxMemory(); + + if(denseSize < 5 * currentSize && inputMatrix.getColGroups().size() > 5 && + denseSize <= localMaxMemory / 2) { + LOG.info("Decompressing for unaryAggregate because of overlapping state"); + inputMatrix.decompress(op.getNumThreads()); + } + MatrixBlock decomp = inputMatrix.getCachedDecompressed(); + if(decomp != null) + return decomp.aggregateUnaryOperations(op, result, blen, indexesIn, inCP); + } + // initialize and allocate the result if(result == null) result = new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false); else result.reset(tempCellIndex.row, tempCellIndex.column, false); - MatrixBlock ret = (MatrixBlock) result; - ret.allocateDenseBlock(); + result.allocateDenseBlock(); AggregateUnaryOperator opm = replaceKahnOperations(op); if(inputMatrix.getColGroups() != null) { - fillStart(ret, opm); + fillStart(result, opm); - if(inputMatrix.isOverlapping() && - (opm.aggOp.increOp.fn instanceof KahanPlusSq || (opm.aggOp.increOp.fn instanceof Builtin && - (((Builtin) opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN || - ((Builtin) opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) - aggregateUnaryOverlapping(inputMatrix, ret, opm, indexesIn, inCP); + if(requireDecompression(inputMatrix, opm)) + aggregateUnaryOverlapping(inputMatrix, result, opm, indexesIn, inCP); else - aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, ret, opm, blen, indexesIn, inCP); + aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, result, opm, blen, indexesIn, inCP); } - ret.recomputeNonZeros(); + + result.recomputeNonZeros(); if(op.aggOp.existsCorrection() && !inCP) { - ret = addCorrection(ret, op); + result = addCorrection(result, op); if(op.aggOp.increOp.fn instanceof Mean) - ret = addCellCount(ret, op, inputMatrix.getNumRows(), inputMatrix.getNumColumns()); + result = addCellCount(result, op, inputMatrix.getNumRows(), inputMatrix.getNumColumns()); } - return ret; + return result; + + } + private static boolean requireDecompression(CompressedMatrixBlock inputMatrix, AggregateUnaryOperator op) { + return inputMatrix.isOverlapping() && + (op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin && + (((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN || + ((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))); } private static MatrixBlock addCorrection(MatrixBlock ret, AggregateUnaryOperator op) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java deleted file mode 100644 index d4fbb7e..0000000 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -// package org.apache.sysds.runtime.compress.lib; - -// import java.util.ArrayList; -// import java.util.Arrays; -// import java.util.List; -// import java.util.concurrent.Callable; -// import java.util.concurrent.ExecutionException; -// import java.util.concurrent.ExecutorService; -// import java.util.concurrent.Future; - -// import org.apache.sysds.hops.OptimizerUtils; -// import org.apache.sysds.runtime.DMLRuntimeException; -// import org.apache.sysds.runtime.compress.CompressedMatrixBlock; -// import org.apache.sysds.runtime.compress.CompressionSettings; -// import org.apache.sysds.runtime.compress.colgroup.AColGroup; -// import org.apache.sysds.runtime.compress.colgroup.ColGroupConst; -// import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; -// import org.apache.sysds.runtime.functionobjects.Equals; -// import org.apache.sysds.runtime.functionobjects.GreaterThan; -// import org.apache.sysds.runtime.functionobjects.GreaterThanEquals; -// import org.apache.sysds.runtime.functionobjects.LessThan; -// import org.apache.sysds.runtime.functionobjects.LessThanEquals; -// import org.apache.sysds.runtime.functionobjects.NotEquals; -// import org.apache.sysds.runtime.matrix.data.MatrixBlock; -// import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator; -// import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; -// import org.apache.sysds.runtime.matrix.operators.ScalarOperator; -// import org.apache.sysds.runtime.util.CommonThreadPool; - -// /** -// * This class is used for relational operators that return binary values depending on individual cells values in the -// * compression. This indicate that the resulting vectors/matrices are amenable to compression since they only contain -// * two distinct values, true or false. -// * -// */ -// public class CLALibRelationalOp { -// // private static final Log LOG = LogFactory.getLog(LibRelationalOp.class.getName()); - -// /** Thread pool matrix Block for materializing decompressed groups. */ -// private static ThreadLocal<MatrixBlock> memPool = new ThreadLocal<MatrixBlock>() { -// @Override -// protected MatrixBlock initialValue() { -// return null; -// } -// }; - -// protected static boolean isValidForRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1) { -// return m1.isOverlapping() && -// (sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan || -// sop.fn instanceof GreaterThanEquals || sop.fn instanceof Equals || sop.fn instanceof NotEquals); -// } - -// // public static MatrixBlock overlappingRelativeRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1) { - -// // List<AColGroup> colGroups = m1.getColGroups(); -// // boolean less = ((sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals) && -// // sop instanceof LeftScalarOperator) || -// // (sop instanceof RightScalarOperator && -// // (sop.fn instanceof GreaterThan || sop.fn instanceof GreaterThanEquals)); -// // double v = sop.getConstant(); -// // double min = m1.min(); -// // double max = m1.max(); - -// // // Shortcut: -// // // If we know worst case min and worst case max and the values to compare to in all cases is -// // // less then or greater than worst then we can return a full matrix with either 1 or 0. - -// // if(v < min || v > max) { -// // if(sop.fn instanceof Equals) { -// // return makeConstZero(m1.getNumRows(), m1.getNumColumns()); -// // } -// // else if(sop.fn instanceof NotEquals) { -// // return makeConstOne(m1.getNumRows(), m1.getNumColumns()); -// // } -// // else if(less) { -// // if(v < min || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v <= min)) -// // return makeConstOne(m1.getNumRows(), m1.getNumColumns()); -// // else -// // return makeConstZero(m1.getNumRows(), m1.getNumColumns()); -// // } -// // else { -// // if(v > max || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v >= max)) -// // return makeConstOne(m1.getNumRows(), m1.getNumColumns()); -// // else -// // return makeConstZero(m1.getNumRows(), m1.getNumColumns()); -// // } -// // } -// // else { -// // return processNonConstant(sop, minMax, min, max, m1.getNumRows(), m1.getNumColumns(), less); -// // } - -// // } - -// private static MatrixBlock makeConstOne(int rows, int cols) { -// // List<AColGroup> newColGroups = new ArrayList<>(); -// // int[] colIndexes = new int[cols]; -// // for(int i = 0; i < colIndexes.length; i++) { -// // colIndexes[i] = i; -// // } -// // double[] values = new double[cols]; -// // Arrays.fill(values, 1); - -// // newColGroups.add(new ColGroupConst(colIndexes, rows, new Dictionary(values))); -// // CompressedMatrixBlock ret = new CompressedMatrixBlock(rows, cols); -// // ret.allocateColGroupList(newColGroups); -// // ret.setNonZeros(cols * rows); -// // ret.setOverlapping(false); -// // return ret; -// // } - -// // private static MatrixBlock makeConstZero(int rows, int cols) { -// // MatrixBlock sb = new MatrixBlock(rows, cols, true, 0); -// // return sb; -// // } - -// // private static MatrixBlock processNonConstant(ScalarOperator sop, MinMaxGroup[] minMax, double minS, double maxS, -// // final int rows, final int cols, boolean less) { - -// // // BitSet res = new BitSet(ret.getNumColumns() * ret.getNumRows()); -// // MatrixBlock res = new MatrixBlock(rows, cols, true, 0).allocateBlock(); -// // int k = OptimizerUtils.getConstrainedNumThreads(-1); -// // int outRows = rows; -// // long nnz = 0; -// // if(k == 1) { -// // final int b = CompressionSettings.BITMAP_BLOCK_SZ / cols; -// // final int blkz = (outRows < b) ? outRows : b; - -// // MatrixBlock tmp = new MatrixBlock(blkz, cols, false, -1).allocateBlock(); -// // for(int i = 0; i * blkz < outRows; i++) { -// // for(MinMaxGroup mmg : minMax) -// // mmg.g.decompressToBlockUnSafe(tmp, i * blkz, Math.min((i + 1) * blkz, rows), 0); - -// // for(int row = 0; row < blkz && row < rows - i * blkz; row++) { -// // int off = (row + i * blkz); -// // for(int col = 0; col < cols; col++) { -// // res.quickSetValue(off, col, sop.executeScalar(tmp.quickGetValue(row, col))); -// // if(res.quickGetValue(off, col) != 0) { -// // nnz++; -// // } -// // } -// // } -// // } -// // tmp.reset(); -// // res.setNonZeros(nnz); -// // } -// // else { -// // final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / 2; -// // ExecutorService pool = CommonThreadPool.get(k); -// // ArrayList<RelationalTask> tasks = new ArrayList<>(); - -// // try { -// // for(int i = 0; i * blkz < outRows; i++) { -// // RelationalTask rt = new RelationalTask(minMax, i, blkz, res, rows, cols, sop); -// // tasks.add(rt); -// // } -// // List<Future<Object>> futures = pool.invokeAll(tasks); -// // pool.shutdown(); -// // for(Future<Object> f : futures) -// // f.get(); -// // } -// // catch(InterruptedException | ExecutionException e) { -// // e.printStackTrace(); -// // throw new DMLRuntimeException(e); -// // } - -// // } -// // memPool.remove(); - -// // return res; -// // } - -// // protected static class MinMaxGroup implements Comparable<MinMaxGroup> { -// // double min; -// // double max; -// // AColGroup g; -// // double[] values; - -// // public MinMaxGroup(double min, double max, AColGroup g) { -// // this.min = min; -// // this.max = max; -// // this.g = g; - -// // this.values = g.getValues(); -// // } - -// // @Override -// // public int compareTo(MinMaxGroup o) { -// // double t = max - min; -// // double ot = o.max - o.min; -// // return Double.compare(t, ot); -// // } - -// // @Override -// // public String toString() { -// // StringBuilder sb = new StringBuilder(); -// // sb.append("MMG: "); -// // sb.append("[" + min + "," + max + "]"); -// // sb.append(" " + g.getClass().getSimpleName()); -// // return sb.toString(); -// // } -// // } - -// // private static class RelationalTask implements Callable<Object> { -// // private final MinMaxGroup[] _minMax; -// // private final int _i; -// // private final int _blkz; -// // private final MatrixBlock _res; -// // private final int _rows; -// // private final int _cols; -// // private final ScalarOperator _sop; - -// // protected RelationalTask(MinMaxGroup[] minMax, int i, int blkz, MatrixBlock res, int rows, int cols, -// // ScalarOperator sop) { -// // _minMax = minMax; -// // _i = i; -// // _blkz = blkz; -// // _res = res; -// // _rows = rows; -// // _cols = cols; -// // _sop = sop; -// // } - -// // @Override -// // public Object call() { -// // MatrixBlock tmp = memPool.get(); -// // if(tmp == null) { -// // memPool.set(new MatrixBlock(_blkz, _cols, false, -1).allocateBlock()); -// // tmp = memPool.get(); -// // } -// // else { -// // tmp = memPool.get(); -// // tmp.reset(_blkz, _cols, false, -1); -// // } - -// // for(MinMaxGroup mmg : _minMax) { -// // if(mmg.g.getNumberNonZeros() != 0) -// // mmg.g.decompressToBlockUnSafe(tmp, _i * _blkz, Math.min((_i + 1) * _blkz, mmg.g.getNumRows()), 0); -// // } - -// // for(int row = 0, off = _i * _blkz; row < _blkz && row < _rows - _i * _blkz; row++, off++) { -// // for(int col = 0; col < _cols; col++) { -// // _res.appendValue(off, col, _sop.executeScalar(tmp.quickGetValue(row, col))); -// // } -// // } -// // return null; -// // } -// // } -// } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java index aea5488..1822685 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java @@ -56,13 +56,6 @@ public class CLALibScalar { private static final int MINIMUM_PARALLEL_SIZE = 8096; public static MatrixBlock scalarOperations(ScalarOperator sop, CompressedMatrixBlock m1, MatrixValue result) { - // Special case handling of overlapping relational operations - // if(CLALibRelationalOp.isValidForRelationalOperation(sop, m1)) { - // MatrixBlock ret = CLALibRelationalOp.overlappingRelativeRelationalOperation(sop, m1); - // ret.recomputeNonZeros(); - // return ret; - // } - if(isInvalidForCompressedOutput(m1, sop)) { LOG.warn("scalar overlapping not supported for op: " + sop.fn); MatrixBlock m1d = m1.decompress(sop.getNumThreads()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java index b37acc1..31b3714 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java +++ b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java @@ -404,7 +404,10 @@ public class WorkloadAnalyzer { ArrayList<Hop> in = hop.getInput(); if(isOverlapping(in.get(0)) || isOverlapping(in.get(1))) overlapping.add(hop.getHopID()); - return new OpNormal(hop, true); + // CBind is in worst case decompressing, but can be compressing the other side if it is trivially compressable. + // to make the optimizer correct we need to mark this operation as decompressing, since it is the worst possible outcome. + // Currently we dont optimize for operations that are located past a cbind. + return new OpDecompressing(hop); } else if(HopRewriteUtils.isBinary(hop, OpOp2.RBIND)) { ArrayList<Hop> in = hop.getInput(); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java index 880f31f..faef2c7 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java @@ -66,6 +66,7 @@ import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.data.TensorBlock; import org.apache.sysds.runtime.data.TensorIndexes; import org.apache.sysds.runtime.instructions.cp.Data; +import org.apache.sysds.runtime.instructions.spark.DeCompressionSPInstruction; import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject; import org.apache.sysds.runtime.instructions.spark.data.LineageObject; import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock; @@ -1280,16 +1281,21 @@ public class SparkExecutionContext extends ExecutionContext return out; } - @SuppressWarnings("unchecked") + // @SuppressWarnings("unchecked") public static long writeMatrixRDDtoHDFS( RDDObject rdd, String path, FileFormat fmt ) { JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(); InputOutputInfo oinfo = InputOutputInfo.get(DataType.MATRIX, fmt); + // if compression is enabled decompress all blocks before writing to disk TEMPORARY MODIFICATION UNTILL MATRIXBLOCK IS MERGED WITH COMPRESSEDMATRIXBLOCK + if(ConfigurationManager.isCompressionEnabled()) + lrdd = lrdd.mapValues(new DeCompressionSPInstruction.DeCompressionFunction()); + //piggyback nnz maintenance on write LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz"); lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); + //save file is an action which also triggers nnz maintenance lrdd.saveAsHadoopFile(path, oinfo.keyClass, diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java index 4ea0284..39b2408 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java @@ -30,6 +30,12 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +/** + * Instruction that performs + * + * res = X / rowsum(x) + * + */ public class BinUaggChainSPInstruction extends UnarySPInstruction { // operators private BinaryOperator _bOp = null; diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java index 6115654..a998a74 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java @@ -29,6 +29,7 @@ import java.io.DataOutputStream; import java.util.Collection; import org.apache.commons.math3.random.Well1024a; +import org.apache.sysds.common.Types.CorrectionLocationType; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; @@ -36,10 +37,15 @@ import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; import org.apache.sysds.runtime.compress.CompressionStatistics; import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; +import org.apache.sysds.runtime.functionobjects.KahanPlus; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.ReduceAll; import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct; import org.apache.sysds.runtime.matrix.data.LibMatrixDatagen; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.RandomMatrixGenerator; +import org.apache.sysds.runtime.matrix.operators.AggregateOperator; +import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator; import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes; @@ -570,6 +576,33 @@ public class CompressedMatrixTest extends AbstractCompressedUnaryTests { cmb.copy(cmb); } + @Test + public void testAggregateTernaryOperation() { + try { + if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 10000) + return; + CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN; + AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), corr); + AggregateTernaryOperator op = new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg, + ReduceAll.getReduceAllFnObject()); + + int nrow = mb.getNumRows(); + int ncol = mb.getNumColumns(); + + MatrixBlock m2 = new MatrixBlock(nrow, ncol, 13.0); + MatrixBlock m3 = new MatrixBlock(nrow, ncol, 14.0); + + MatrixBlock ret1 = cmb.aggregateTernaryOperations(cmb, m2, m3, null, op, true); + MatrixBlock ret2 = mb.aggregateTernaryOperations(mb, m2, m3, null, op, true); + + compareResultMatrices(ret2, ret1, 1); + } + catch(Exception e) { + e.printStackTrace(); + throw new DMLRuntimeException(e); + } + } + private static long getJolSize(CompressedMatrixBlock cmb, CompressionStatistics cStat) { Layouter l = new HotSpotLayouter(new X86_64_DataModel()); long jolEstimate = 0; diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java index eb5ceca..c291235 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java +++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java @@ -31,7 +31,6 @@ import java.util.List; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysds.common.Types.CorrectionLocationType; import org.apache.sysds.lops.MMTSJ.MMTSJType; import org.apache.sysds.lops.MapMultChain.ChainType; import org.apache.sysds.runtime.DMLRuntimeException; @@ -55,22 +54,18 @@ import org.apache.sysds.runtime.functionobjects.Divide; import org.apache.sysds.runtime.functionobjects.Equals; import org.apache.sysds.runtime.functionobjects.GreaterThan; import org.apache.sysds.runtime.functionobjects.GreaterThanEquals; -import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.functionobjects.LessThan; import org.apache.sysds.runtime.functionobjects.LessThanEquals; import org.apache.sysds.runtime.functionobjects.Minus; import org.apache.sysds.runtime.functionobjects.Multiply; import org.apache.sysds.runtime.functionobjects.Plus; import org.apache.sysds.runtime.functionobjects.Power2; -import org.apache.sysds.runtime.functionobjects.ReduceAll; import org.apache.sysds.runtime.functionobjects.SwapIndex; import org.apache.sysds.runtime.functionobjects.ValueFunction; import org.apache.sysds.runtime.functionobjects.Xor; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; -import org.apache.sysds.runtime.matrix.operators.AggregateOperator; -import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator; import org.apache.sysds.runtime.matrix.operators.ReorgOperator; @@ -1253,28 +1248,6 @@ public abstract class CompressedTestBase extends TestBase { } @Test - public void aggregateTernaryOperations() { - if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 10000) - return; - - MatrixBlock m1 = new MatrixBlock(); - MatrixBlock m2 = new MatrixBlock(); - MatrixBlock m3 = null; - CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN; - AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), corr); - AggregateTernaryOperator op = new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg, - ReduceAll.getReduceAllFnObject(), _k); - - boolean inCP = true; - - MatrixBlock ret1 = mb.aggregateTernaryOperations(m1, m2, m3, null, op, inCP); - MatrixBlock ret2 = cmb.aggregateTernaryOperations(m1, m2, m3, null, op, inCP); - - compareResultMatrices(ret1, ret2, 1); - - } - - @Test public void unaryOperations() { if(!(cmb instanceof CompressedMatrixBlock) || cmb.getNumColumns() < 2) return; diff --git a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java index aacb716..9422e58 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java @@ -119,7 +119,7 @@ public class WorkloadTest { args.put("$2", "FALSE"); args.put("$3", "0"); - tests.add(new Object[] {0, 0, 0, 1, 1, 1, 6, 0, true, false, "functions/lmDS.dml", args}); + tests.add(new Object[] {0, 1, 0, 1, 1, 1, 6, 0, true, false, "functions/lmDS.dml", args}); tests.add(new Object[] {0, 0, 0, 1, 0, 1, 0, 0, true, true, "functions/lmDS.dml", args}); tests.add(new Object[] {0, 0, 0, 1, 10, 10, 1, 0, true, true, "functions/lmCG.dml", args}); @@ -134,15 +134,15 @@ public class WorkloadTest { args.put("$1", testFile); args.put("$2", "TRUE"); args.put("$3", "1"); - tests.add(new Object[] {0, 0, 1, 1, 1, 1, 1, 0, true, true, "functions/lmDS.dml", args}); - tests.add(new Object[] {0, 0, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args}); + tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, "functions/lmDS.dml", args}); + tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args}); args = new HashMap<>(); args.put("$1", testFile); args.put("$2", "TRUE"); args.put("$3", "2"); - tests.add(new Object[] {0, 0, 1, 1, 1, 1, 3, 0, true, true, "functions/lmDS.dml", args}); - tests.add(new Object[] {0, 0, 1, 1, 11, 10, 4, 0, true, true, "functions/lmCG.dml", args}); + tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, "functions/lmDS.dml", args}); + tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args}); args = new HashMap<>(); args.put("$1", testFile); @@ -176,7 +176,7 @@ public class WorkloadTest { CostEstimatorBuilder ceb = new CostEstimatorBuilder(wtr); InstructionTypeCounter itc = ceb.getCounter(); - verify(wtr, itc, ceb); + verify(wtr, itc, ceb, scriptName, args); } catch(Exception e) { e.printStackTrace(); @@ -184,9 +184,9 @@ public class WorkloadTest { } } - private void verify(WTreeRoot wtr, InstructionTypeCounter itc, CostEstimatorBuilder ceb) { + private void verify(WTreeRoot wtr, InstructionTypeCounter itc, CostEstimatorBuilder ceb, String name, Map<String, String> args) { - String errorString = wtr + "\n" + itc + " \n "; + String errorString = wtr + "\n" + itc + " \n " + name + " -- " + args + "\n"; Assert.assertEquals(errorString + "scans:", scans, itc.getScans()); Assert.assertEquals(errorString + "decompressions", decompressions, itc.getDecompressions()); Assert.assertEquals(errorString + "overlappingDecompressions", overlappingDecompressions, @@ -197,7 +197,7 @@ public class WorkloadTest { itc.getCompressedMultiplications()); Assert.assertEquals(errorString + "dictionaryOps", dictionaryOps, itc.getDictionaryOps()); Assert.assertEquals(errorString + "lookup", indexing, itc.getIndexing()); - Assert.assertEquals(shouldCompress, ceb.shouldTryToCompress()); + Assert.assertEquals(errorString + "Should Compresss", shouldCompress, ceb.shouldTryToCompress()); } private static WTreeRoot getWorkloadTree(DMLProgram prog) { diff --git a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java index 9ffeb02..5de8880 100644 --- a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java +++ b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java @@ -73,7 +73,7 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase { @Test public void testMLogRegCP() { - runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 2, false); + runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 1, false); } @Test
