This is an automated email from the ASF dual-hosted git repository. janniklinde pushed a commit to branch pr-2420 in repository https://gitbox.apache.org/repos/asf/systemds.git
commit ca3b64d61b58663f650a1ad5841ce12a0b1de2da Author: mori49 <[email protected]> AuthorDate: Tue Apr 14 10:05:52 2026 +0200 [SYSTEMDS-3543] Piece-wise linear compression of column groups Closes #2420. --- .../runtime/compress/CompressionSettings.java | 26 +- .../sysds/runtime/compress/colgroup/AColGroup.java | 7 +- .../runtime/compress/colgroup/ColGroupFactory.java | 87 ++- .../ColGroupPiecewiseLinearCompressed.java | 678 +++++++++++++++++++++ .../colgroup/functional/PiecewiseLinearUtils.java | 306 ++++++++++ .../PiecewiseLinearCompressionPerformanceTest.java | 168 +++++ ...oupPiecewiseLinearCompressedOperationsTest.java | 308 ++++++++++ .../ColGroupPiecewiseLinearCompressedTest.java | 455 ++++++++++++++ 8 files changed, 2016 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java index af944fce75..99c4b9c2ec 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java @@ -55,16 +55,16 @@ public class CompressionSettings { /** * The sampling ratio used when choosing ColGroups. Note that, default behavior is to use exact estimator if the * number of elements is below 1000. - * + * * DEPRECATED */ public final double samplingRatio; /** * The sampling ratio power to use when choosing sample size. This is used in accordance to the function: - * + * * sampleSize += nRows^samplePower; - * + * * The value is bounded to be in the range of 0 to 1, 1 giving a sample size of everything, and 0 adding 1. */ public final double samplePower; @@ -114,8 +114,9 @@ public class CompressionSettings { /** * Transpose input matrix, to optimize access when extracting bitmaps. This setting is changed inside the script * based on the transposeInput setting. - * - * This is intentionally left as a mutable value, since the transposition of the input matrix is decided in phase 3. + * + * This is intentionally left as a mutable value, since the transposition of the input matrix is decided in phase + * 3. */ public boolean transposed = false; @@ -135,6 +136,19 @@ public class CompressionSettings { public final boolean preferDeltaEncoding; + // Handling Targetloss for piecewise linear Kompression + + private double piecewiseTargetLoss = Double.NaN; + + public void setPiecewiseTargetLoss(double piecewiseTargetLoss) { + this.piecewiseTargetLoss = piecewiseTargetLoss; + + } + + public double getPiecewiseTargetLoss() { + return piecewiseTargetLoss; + } + protected CompressionSettings(double samplingRatio, double samplePower, boolean allowSharedDictionary, String transposeInput, int seed, boolean lossy, EnumSet<CompressionType> validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, @@ -161,7 +175,7 @@ public class CompressionSettings { this.sdcSortType = sdcSortType; this.scaleFactors = scaleFactors; this.preferDeltaEncoding = preferDeltaEncoding; - + if(!printedStatus && LOG.isDebugEnabled()) { printedStatus = true; LOG.debug(this.toString()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index fbe04c732e..57bba0bef1 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -65,7 +65,8 @@ public abstract class AColGroup implements Serializable { /** Public super types of compression ColGroups supported */ public static enum CompressionType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, DDCLZW, LinearFunctional; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, DeltaDDC, DDCLZW, LinearFunctional, + PiecewiseLinear, PiecewiseLinearSuccessive; public boolean isDense() { return this == DDC || this == CONST || this == DDCFOR || this == DDCFOR || this == DDCLZW; @@ -86,8 +87,8 @@ public abstract class AColGroup implements Serializable { * Protected such that outside the ColGroup package it should be unknown which specific subtype is used. */ protected static enum ColGroupType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DDCLZW, DeltaDDC, - LinearFunctional; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC, + LinearFunctional, PiecewiseLinear; } /** The ColGroup indexes contained in the ColGroup */ diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java index 7699d7b7c1..89bb040d44 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java @@ -43,6 +43,7 @@ import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; import org.apache.sysds.runtime.compress.colgroup.functional.LinearRegression; +import org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; import org.apache.sysds.runtime.compress.colgroup.insertionsort.AInsertionSorter; @@ -106,7 +107,7 @@ public class ColGroupFactory { /** * The actual compression method, that handles the logic of compressing multiple columns together. - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -120,7 +121,7 @@ public class ColGroupFactory { /** * The actual compression method, that handles the logic of compressing multiple columns together. - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -135,7 +136,7 @@ public class ColGroupFactory { } /** - * + * * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify * this. * @param csi The compression information extracted from the estimation, this contains which groups of columns to @@ -232,8 +233,9 @@ public class ColGroupFactory { time, retType, estC, actC, act.getNumValues(), cols, wanted, warning)); } else { - LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", - time, retType, estC, actC, act.getNumValues(), cols, wanted)); + LOG.debug( + String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", time, + retType, estC, actC, act.getNumValues(), cols, wanted)); } } @@ -306,6 +308,12 @@ public class ColGroupFactory { return compressLinearFunctional(colIndexes, in, cs); } } + else if(ct == CompressionType.PiecewiseLinear) { + return compressPiecewiseLinearFunctional(colIndexes, in, cs); + } + else if(ct == CompressionType.PiecewiseLinearSuccessive) { + return compressPiecewiseLinearFunctionalSuccessive(colIndexes, in, cs); + } else if(ct == CompressionType.DDCFOR) { AColGroup g = directCompressDDC(colIndexes, cg); if(g instanceof ColGroupDDC) @@ -710,7 +718,7 @@ public class ColGroupFactory { if(cs.scaleFactors != null) { throw new NotImplementedException("Delta encoding with quantization not yet implemented"); } - + if(colIndexes.size() > 1) { return directCompressDeltaDDCMultiCol(colIndexes, cg); } @@ -742,7 +750,7 @@ public class ColGroupFactory { if(map.size() == 0) return new ColGroupEmpty(colIndexes); - + final double[] dictValues = map.getDictionary(); IDictionary dict = new DeltaDictionary(dictValues, 1); @@ -751,7 +759,8 @@ public class ColGroupFactory { return ColGroupDeltaDDC.create(colIndexes, dict, resData, null); } - private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) throws Exception { + private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) + throws Exception { final AMapToData d = MapToFactory.create(nRow, Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126)); final int fill = d.getUpperBoundValue(); d.fill(fill); @@ -830,8 +839,8 @@ public class ColGroupFactory { int fill) { ReaderColumnSelection reader = (cs.scaleFactors == null) ? ReaderColumnSelection.createReader(in, colIndexes, - cs.transposed, rl, - ru) : ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, ru, cs.scaleFactors); + cs.transposed, rl, ru) : ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, ru, + cs.scaleFactors); DblArray cellVals = reader.nextRow(); boolean extra = false; @@ -1078,6 +1087,64 @@ public class ColGroupFactory { return ColGroupLinearFunctional.create(colIndexes, coefficients, numRows); } + /** + * This method is the entry point to compress a matrix with piecewise linear compression The first method uses a + * segmented least squares with dynamic programming to compress the columns The second method uses a successive + * compression method, which compares each values in linear time and checks if the targetloss exceeded + * + * @param colIndexes the column indices to compress + * @param in the input Matrixblock containing the data + * @param cs compression settings to define the target loss, which should be considered + * @return a piecewise linear compressed column group + */ + + public static AColGroup compressPiecewiseLinearFunctional(IColIndex colIndexes, MatrixBlock in, + CompressionSettings cs) { + + final int numRows = in.getNumRows(); + final int numCols = colIndexes.size(); + int[][] breakpointsPerCol = new int[numCols][]; + double[][] slopesPerCol = new double[numCols][]; + double[][] interceptsPerCol = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int colIdx = colIndexes.get(col); + double[] column = PiecewiseLinearUtils.getColumn(in, colIdx); + PiecewiseLinearUtils.SegmentedRegression fit = PiecewiseLinearUtils.compressSegmentedLeastSquares(column, + cs); + breakpointsPerCol[col] = fit.getBreakpoints(); + interceptsPerCol[col] = fit.getIntercepts(); + slopesPerCol[col] = fit.getSlopes(); + + } + return ColGroupPiecewiseLinearCompressed.create(colIndexes, breakpointsPerCol, slopesPerCol, interceptsPerCol, + numRows); + + } + + public static AColGroup compressPiecewiseLinearFunctionalSuccessive(IColIndex colIndexes, MatrixBlock in, + CompressionSettings cs) { + final int numRows = in.getNumRows(); + final int numCols = colIndexes.size(); + int[][] breakpointsPerCol = new int[numCols][]; + double[][] slopesPerCol = new double[numCols][]; + double[][] interceptsPerCol = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int colIdx = colIndexes.get(col); + double[] column = PiecewiseLinearUtils.getColumn(in, colIdx); + PiecewiseLinearUtils.SegmentedRegression fit = PiecewiseLinearUtils.compressSuccessivePiecewiseLinear( + column, cs); + breakpointsPerCol[col] = fit.getBreakpoints(); + interceptsPerCol[col] = fit.getIntercepts(); + slopesPerCol[col] = fit.getSlopes(); + + } + return ColGroupPiecewiseLinearCompressed.create(colIndexes, breakpointsPerCol, slopesPerCol, interceptsPerCol, + numRows); + + } + private AColGroup compressSDCFromSparseTransposedBlock(IColIndex cols, int nrUniqueEstimate, double tupleSparsity) { if(cols.size() > 1) return compressMultiColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate, tupleSparsity); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java new file mode 100644 index 0000000000..f05a5d46e7 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java @@ -0,0 +1,678 @@ +package org.apache.sysds.runtime.compress.colgroup; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.functionobjects.*; +import org.apache.sysds.runtime.instructions.cp.CmCovObject; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.CMOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.utils.MemoryEstimates; + +import java.util.Arrays; + +/** + * This class represents a new ColGroup which is compresses column into segments (piecewise linear) to represent the + * original Data each column is approximate by a set of linear segments defined by breakpoints, slopes and intercepts + */ + +public class ColGroupPiecewiseLinearCompressed extends AColGroupCompressed { + /** + * breakpoints indices per column to define the segment boundaries slopes of the regression line per segment per + * column intercepts of the regression line per segment per column + */ + int[][] breakpointsPerCol; + double[][] slopesPerCol; + double[][] interceptsPerCol; + int numRows; + + protected ColGroupPiecewiseLinearCompressed(IColIndex colIndices) { + super(colIndices); + } + + public ColGroupPiecewiseLinearCompressed(IColIndex colIndices, int[][] breakpoints, double[][] slopes, + double[][] intercepts, int numRows) { + super(colIndices); + this.breakpointsPerCol = breakpoints; + this.slopesPerCol = slopes.clone(); + this.interceptsPerCol = intercepts.clone(); + this.numRows = numRows; + } + + /** + * creates a new piecewise linear compress column group validates inputs and copies all arrays before storing + * + * @param colIndices the column indices this group represents + * @param breakpointsPerCol breakpoint indices per column + * @param slopesPerCol slope of each segment per column + * @param interceptsPerCol intercept of each segment per column + * @param numRows number of rows in the original matrix + * @return a new ColGroupPiecewiseLinearCompressed instance + * @throws IllegalArgumentException if breakpoints are invalid or arrays are inconsistent + */ + + public static AColGroup create(IColIndex colIndices, int[][] breakpointsPerCol, double[][] slopesPerCol, + double[][] interceptsPerCol, int numRows) { + final int numCols = colIndices.size(); + if(breakpointsPerCol.length != numCols) + throw new IllegalArgumentException( + "bp.length=" + breakpointsPerCol.length + " != colIndices.size()=" + numCols); + + for(int c = 0; c < numCols; c++) { + if(breakpointsPerCol[c].length < 1 || breakpointsPerCol[c][0] != 0 || + breakpointsPerCol[c][breakpointsPerCol[c].length - 1] != numRows) + throw new IllegalArgumentException( + "Invalid breakpoints for col " + c + ": must start=0, end=numRows, >=1 pts"); + + if(slopesPerCol[c].length != interceptsPerCol[c].length || + slopesPerCol[c].length != breakpointsPerCol[c].length - 1) + throw new IllegalArgumentException("Inconsistent array lengths col " + c); + } + + int[][] bpCopy = new int[numCols][]; + double[][] slopeCopy = new double[numCols][]; + double[][] interceptCopy = new double[numCols][]; + // defensive copy to prevent external modification + for(int c = 0; c < numCols; c++) { + bpCopy[c] = Arrays.copyOf(breakpointsPerCol[c], breakpointsPerCol[c].length); + slopeCopy[c] = Arrays.copyOf(slopesPerCol[c], slopesPerCol[c].length); + interceptCopy[c] = Arrays.copyOf(interceptsPerCol[c], interceptsPerCol[c].length); + } + + return new ColGroupPiecewiseLinearCompressed(colIndices, bpCopy, slopeCopy, interceptCopy, numRows); + + } + + /** + * Decompresses a ColGroupPiecewiseLinearCompress into a DenseBlock Each value is reconstructed via slopes[seg]*row + * + intercept[seg] + * + * @param db Target DenseBlock + * @param rl Row to start decompression from + * @param ru Row to end decompression at (not inclusive) + * @param offR Row offset into the target to decompress + * @param offC Column offset into the target to decompress + */ + @Override + public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int offR, int offC) { + if(db == null || _colIndexes == null || _colIndexes.size() == 0 || breakpointsPerCol == null || + slopesPerCol == null || interceptsPerCol == null) { + return; + } + for(int col = 0; col < _colIndexes.size(); col++) { + final int colIndex = _colIndexes.get(col); + int[] breakpoints = breakpointsPerCol[col]; + double[] slopes = slopesPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + // per segment in this column + for(int seg = 0; seg + 1 < breakpoints.length; seg++) { + int segStart = breakpoints[seg]; + int segEnd = breakpoints[seg + 1]; + if(segStart >= segEnd) + continue; + + double currentSlopeInSegment = slopes[seg]; + double currentInterceptInSegment = intercepts[seg]; + // intersect segment with requested row range [rl, ru) + + int rowStart = Math.max(segStart, rl); + int rowEnd = Math.min(segEnd, ru); + if(rowStart >= rowEnd) + continue; + + //Fill DenseBlock für this column and Segment + for(int row = rowStart; row < rowEnd; row++) { + double yhat = currentSlopeInSegment * row + currentInterceptInSegment; + int dbRow = offR + row; + int dbCol = offC + colIndex; + + if(dbRow >= 0 && dbRow < db.numRows() && dbCol >= 0 && dbCol < db.numCols()) { + db.set(dbRow, dbCol, yhat); + } + } + + } + + } + } + + public int[][] getBreakpointsPerCol() { + return breakpointsPerCol; + } + + public double[][] getSlopesPerCol() { + return slopesPerCol; + } + + public double[][] getInterceptsPerCol() { + return interceptsPerCol; + } + + /** + * Return a decompressed value at row r and column colIdx uses binary search to find the correct segment + * + * @param r row + * @param colIdx column index in the _colIndexes. + * @return reconstructed value with slope[segment]*r+intercepts[segment] + */ + @Override + public double getIdx(int r, int colIdx) { + //safety check + if(r < 0 || r >= numRows || colIdx < 0 || colIdx >= _colIndexes.size()) { + return 0.0; + } + int[] breakpoints = breakpointsPerCol[colIdx]; + double[] slopes = slopesPerCol[colIdx]; + double[] intercepts = interceptsPerCol[colIdx]; + // binary search for the segment containing row r + int lowerBound = 0; + int higherBound = breakpoints.length - 2; + while(lowerBound <= higherBound) { + int mid = (lowerBound + higherBound) / 2; + if(r < breakpoints[mid + 1]) { + higherBound = mid - 1; + } + else + lowerBound = mid + 1; + } + int segment = Math.min(lowerBound, breakpoints.length - 2); + return slopes[segment] * (double) r + intercepts[segment]; + } + + /** + * Returns a total number of stored values remaining all columns counting breakpoints, slopes and intercepts per + * column + * + * @return total number of stored compression values + */ + @Override + public int getNumValues() { + int total = 0; + for(int c = 0; c < _colIndexes.size(); c++) { + total += breakpointsPerCol[c].length + slopesPerCol[c].length + interceptsPerCol[c].length; + } + return total; + } + + /** + * Returns the exact size on disk in bytes includes per column arrays for breakpoints, slopes, intercepts + * + * @return size in bytes + */ + @Override + public long getExactSizeOnDisk() { + long ret = super.getExactSizeOnDisk(); + int numCols = _colIndexes.size(); + ret += 8L * numCols * 3; //array reference pointers + ret += 24L * 3; // outer array headers + ret += 4L; //numRows field + + for(int c = 0; c < numCols; c++) { + ret += (long) MemoryEstimates.intArrayCost(breakpointsPerCol[c].length); + ret += (long) MemoryEstimates.doubleArrayCost(slopesPerCol[c].length); + ret += (long) MemoryEstimates.doubleArrayCost(interceptsPerCol[c].length); + } + + return ret; + + } + + /** + * Computes the column sums of the decompressed matrix using sum of arithmetic series Where sumX = len * (2*start + + * len - 1) / 2 + * + * @param c output array to accumulate column sums into + * @param nRows number of rows, which is used because it is covered by the breakpoints + */ + @Override + public void computeSum(double[] c, int nRows) { + for(int col = 0; col < _colIndexes.size(); col++) { + double sum = 0.0; + int[] breakpoints = breakpointsPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + double[] slopes = slopesPerCol[col]; + + for(int seg = 0; seg < slopes.length; seg++) { + int start = breakpoints[seg]; + int end = breakpoints[seg + 1]; + int len = end - start; + if(len <= 0) + continue; + + double sumX = (double) len * (2.0 * start + (len - 1)) / 2.0; + sum += slopes[seg] * sumX + intercepts[seg] * len; + } + c[col] += sum; + } + } + + /** + * Computes column sums by delegating to computeSum Methods are identical because every ColGroup just knows its own + * column + * + * @param c The array to add the column sum to. + * @param nRows The number of rows in the column group. + */ + + @Override + public void computeColSums(double[] c, int nRows) { + computeSum(c, nRows); + } + + @Override + public CompressionType getCompType() { + return CompressionType.PiecewiseLinear; + } + + @Override + protected ColGroupType getColGroupType() { + return ColGroupType.PiecewiseLinear; + } + + /** + * Applies a scalar operation to all segments of this column group For plus/minus operation are only the intercepts + * modified For Multiply/Divide slopes and intercepts are scaled + * + * @param op operation to perform + * @return a new ColGroupPiecewiseLinearCompressed with updated coefficients + * @throws NotImplementedException if the operator is not plus, minus, multiply or divide + */ + @Override + public AColGroup scalarOperation(ScalarOperator op) { + final int numCols = _colIndexes.size(); + + if(!(op.fn instanceof Plus || op.fn instanceof Minus || op.fn instanceof Multiply || op.fn instanceof Divide)) { + throw new NotImplementedException("Unsupported scalar op: " + op.fn.getClass().getSimpleName()); + } + + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + final int numSegments = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSegments]; + newSlopes[col] = new double[numSegments]; + + for(int seg = 0; seg < numSegments; seg++) { + if(op.fn instanceof Plus || op.fn instanceof Minus) { + // only intercepts changes + newSlopes[col][seg] = slopesPerCol[col][seg]; + newIntercepts[col][seg] = op.executeScalar(interceptsPerCol[col][seg]); + } + else { // Multiply/Divide + newSlopes[col][seg] = op.executeScalar(slopesPerCol[col][seg]); + newIntercepts[col][seg] = op.executeScalar(interceptsPerCol[col][seg]); + } + } + } + + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + /** + * Applies a row vector operation from the left For plus/minus are the intercepts shifted For multiply/divide slopes + * and intercepts are scaled + * + * @param op The operation to execute + * @param v The vector of values to apply the values contained should be at least the length of the highest + * value in the column index + * @param isRowSafe True if the binary op is applied to an entire zero row and all results are zero + * @return a new ColGroupPiecewiseLinearCompressed with updated coefficients + */ + + @Override + public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) { + final int numCols = _colIndexes.size(); + double[][] newIntercepts = new double[numCols][]; + double[][] newSlopes = new double[numCols][]; + final boolean isAddSub = op.fn instanceof Plus || op.fn instanceof Minus; + + if(!isAddSub && !(op.fn instanceof Multiply || op.fn instanceof Divide)) + throw new NotImplementedException("Unsupported binary op: " + op.fn.getClass().getSimpleName()); + + for(int col = 0; col < numCols; col++) { + double rowValue = v[_colIndexes.get(col)]; + int numSegs = interceptsPerCol[col].length; + newIntercepts[col] = new double[numSegs]; + + // Plus/Minus: slope is translation-invariant, only intercept shifts + newSlopes[col] = isAddSub ? slopesPerCol[col].clone() : new double[numSegs]; + + for(int seg = 0; seg < numSegs; seg++) { + newIntercepts[col][seg] = op.fn.execute(rowValue, interceptsPerCol[col][seg]); + if(!isAddSub) + newSlopes[col][seg] = op.fn.execute(rowValue, slopesPerCol[col][seg]); + } + } + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + /** + * Applies a row vector operation from the right For plus/minus are the intercepts shifted For multiply/divide + * slopes and intercepts are scaled + * + * @param op The operation to execute + * @param v The vector of values to apply the values contained should be at least the length of the highest + * value in the column index + * @param isRowSafe True if the binary op is applied to an entire zero row and all results are zero + * @return a new ColGroupPiecewiseLinearCompressed with updated coefficients + */ + @Override + public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) { + final int numCols = _colIndexes.size(); + final boolean isAddSub = op.fn instanceof Plus || op.fn instanceof Minus; + + if(!isAddSub && !(op.fn instanceof Multiply || op.fn instanceof Divide)) + throw new NotImplementedException("Unsupported scalar op: " + op.fn.getClass().getSimpleName()); + + double[][] newSlopes = new double[numCols][]; + double[][] newIntercepts = new double[numCols][]; + + for(int col = 0; col < numCols; col++) { + double val = v[_colIndexes.get(col)]; + int numSegs = interceptsPerCol[col].length; + // Plus/Minus shifts intercept only, slopes are unchanged + newSlopes[col] = isAddSub ? slopesPerCol[col].clone() : new double[numSegs]; + newIntercepts[col] = new double[numSegs]; + + for(int seg = 0; seg < numSegs; seg++) { + newIntercepts[col][seg] = op.fn.execute(interceptsPerCol[col][seg], val); + if(!isAddSub) + newSlopes[col][seg] = op.fn.execute(slopesPerCol[col][seg], val); + } + } + return new ColGroupPiecewiseLinearCompressed(_colIndexes, breakpointsPerCol, newSlopes, newIntercepts, numRows); + } + + /** + * Returns true if any decompressed value in this column group equals the given pattern + * + * @param pattern The value to look for. + * @return true if pattern is found, else false + */ + @Override + public boolean containsValue(double pattern) { + for(int col = 0; col < _colIndexes.size(); col++) { + if(colContainsValue(col, pattern)) + return true; + } + return false; + } + + /** + * checks if any reconstructed value in column col equals the pattern for each segment, solves the m * x + b = + * pattern instead of scanning all rows + * + * @param col column index + * @param pattern the value to search for + * @return true if the pattern is found + */ + + private boolean colContainsValue(int col, double pattern) { + int[] breakpoints = breakpointsPerCol[col]; + double[] intercepts = interceptsPerCol[col]; + double[] slopes = slopesPerCol[col]; + for(int seg = 0; seg < breakpoints.length - 1; seg++) { + int start = breakpoints[seg]; + int len = breakpoints[seg + 1] - start; + if(len <= 0) + continue; + + double b = intercepts[seg]; + double m = slopes[seg]; + + if(m == 0.0) { + // constant segment: all values equal b + if(Double.compare(b, pattern) == 0) + return true; + continue; + } + + // check if pattern lies on the line: solve m*x + b = pattern for x + double x = (pattern - b) / m; + int xi = (int) x; + if(xi >= start && xi < start + len && Double.compare(m * xi + b, pattern) == 0) + return true; + } + return false; + } + + @Override + public AColGroup unaryOperation(UnaryOperator op) { + throw new NotImplementedException(); + } + + @Override + public AColGroup replace(double pattern, double replace) { + throw new NotImplementedException(); + } + + @Override + protected double computeMxx(double c, Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + protected void computeColMxx(double[] c, Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + protected void computeSumSq(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeColSumsSq(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowSums(double[] c, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeProduct(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) { + throw new NotImplementedException(); + + } + + @Override + protected void computeColProduct(double[] c, int nRows) { + throw new NotImplementedException(); + + } + + @Override + protected double[] preAggSumRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggSumSqRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggProductRows() { + throw new NotImplementedException(); + } + + @Override + protected double[] preAggBuiltinRows(Builtin builtin) { + throw new NotImplementedException(); + } + + @Override + public boolean sameIndexStructure(AColGroupCompressed that) { + throw new NotImplementedException(); + } + + @Override + protected void tsmm(double[] result, int numColumns, int nRows) { + throw new NotImplementedException(); + + } + + @Override + public AColGroup copyAndSet(IColIndex colIndexes) { + throw new NotImplementedException(); + } + + @Override + public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int ru) { + throw new NotImplementedException(); + + } + + @Override + public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int nColOut) { + throw new NotImplementedException(); + + } + + @Override + public void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int offR, int offC) { + throw new NotImplementedException(); + + } + + @Override + public AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols, int k) { + throw new NotImplementedException(); + } + + @Override + public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) { + throw new NotImplementedException(); + + } + + @Override + public void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int nRows) { + throw new NotImplementedException(); + + } + + @Override + public void tsmmAColGroup(AColGroup other, MatrixBlock result) { + throw new NotImplementedException(); + + } + + @Override + protected AColGroup sliceSingleColumn(int idx) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex outputCols) { + throw new NotImplementedException(); + } + + @Override + public AColGroup sliceRows(int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + public long getNumberNonZeros(int nRows) { + throw new NotImplementedException(); + } + + @Override + public CmCovObject centralMoment(CMOperator op, int nRows) { + throw new NotImplementedException(); + } + + @Override + public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows) { + throw new NotImplementedException(); + } + + @Override + public double getCost(ComputationCostEstimator e, int nRows) { + throw new NotImplementedException(); + } + + @Override + public AColGroup append(AColGroup g) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup appendNInternal(AColGroup[] groups, int blen, int rlen) { + throw new NotImplementedException(); + } + + @Override + public ICLAScheme getCompressionScheme() { + throw new NotImplementedException(); + } + + @Override + public AColGroup recompress() { + throw new NotImplementedException(); + } + + @Override + public CompressedSizeInfoColGroup getCompressionInfo(int nRow) { + throw new NotImplementedException(); + } + + @Override + protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) { + throw new NotImplementedException(); + } + + @Override + public AColGroup reduceCols() { + throw new NotImplementedException(); + } + + @Override + public double getSparsity() { + throw new NotImplementedException(); + } + + @Override + protected void sparseSelection(MatrixBlock selection, ColGroupUtils.P[] points, MatrixBlock ret, int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + protected void denseSelection(MatrixBlock selection, ColGroupUtils.P[] points, MatrixBlock ret, int rl, int ru) { + throw new NotImplementedException(); + } + + @Override + public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) { + throw new NotImplementedException(); + } + +} + diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java new file mode 100644 index 0000000000..7b0b4bfa96 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java @@ -0,0 +1,306 @@ +package org.apache.sysds.runtime.compress.colgroup.functional; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class PiecewiseLinearUtils { + /** + * Utility methods for piecewise linear compression of matric columns + * supports compression used the segmented least squares algorithm which is implemented with dynamic programming + * and a successive method, which puts all values in a segment till the target loss is exceeded + */ + + private PiecewiseLinearUtils() { + + } + + public static final class SegmentedRegression { + private final int[] breakpoints; + private final double[] slopes; + private final double[] intercepts; + + public SegmentedRegression(int[] breakpoints, double[] slopes, double[] intercepts) { + this.breakpoints = breakpoints; + this.slopes = slopes; + this.intercepts = intercepts; + } + + public int[] getBreakpoints() { + return breakpoints; + } + + public double[] getSlopes() { + return slopes; + } + + public double[] getIntercepts() { + return intercepts; + } + } + + public static double[] getColumn(MatrixBlock in, int colIndex) { + final int numRows = in.getNumRows(); + final double[] column = new double[numRows]; + + for(int row = 0; row < numRows; row++) { + column[row] = in.get(row, colIndex); + } + return column; + } + + public static SegmentedRegression compressSegmentedLeastSquares(double[] column, CompressionSettings cs) { + //compute Breakpoints for a Column with dynamic Programming + final List<Integer> breakpointsList = computeBreakpoints(cs, column); + final int[] breakpoints = breakpointsList.stream().mapToInt(Integer::intValue).toArray(); + + //get values for Regression + final int numSeg = breakpoints.length - 1; + final double[] slopes = new double[numSeg]; + final double[] intercepts = new double[numSeg]; + + // Regress per Segment + for(int seg = 0; seg < numSeg; seg++) { + final int SegStart = breakpoints[seg]; + final int SegEnd = breakpoints[seg + 1]; + + final double[] line = regressSegment(column, SegStart, SegEnd); + slopes[seg] = line[0]; //slope regession line + intercepts[seg] = line[1]; //intercept regression line + } + + return new SegmentedRegression(breakpoints, slopes, intercepts); + } + + public static SegmentedRegression compressSuccessivePiecewiseLinear(double[] column, CompressionSettings cs) { + //compute Breakpoints for a Column with a sukzessive breakpoints algorithm + + final List<Integer> breakpointsList = computeBreakpointSuccessive(column, cs); + final int[] breakpoints = breakpointsList.stream().mapToInt(Integer::intValue).toArray(); + + //get values for Regression + final int numSeg = breakpoints.length - 1; + final double[] slopes = new double[numSeg]; + final double[] intercepts = new double[numSeg]; + + // Regress per Segment + for(int seg = 0; seg < numSeg; seg++) { + final int segstart = breakpoints[seg]; + final int segEnd = breakpoints[seg + 1]; + final double[] line = regressSegment(column, segstart, segEnd); + slopes[seg] = line[0]; + intercepts[seg] = line[1]; + } + return new SegmentedRegression(breakpoints, slopes, intercepts); + } + + /** + * Computes breakpoints for a column using segmented least squares with dynamic programming + * Iteratively reduces lambda to increase the number of segments until the target MSE is met. + * + * @param cs compression settings containing the target loss + * @param column the column values to segment + * @return list of breakpoint indices, starting with 0 + */ + public static List<Integer> computeBreakpoints(CompressionSettings cs, double[] column) { + final int numElements = column.length; + final double targetMSE = cs.getPiecewiseTargetLoss(); + final double sseMax = numElements * targetMSE; // max allowed total SSE + + //start with high lambda an reduce iteratively + double lambda = Math.max(10.0, sseMax * 2.0); + List<Integer> bestBreaks = Arrays.asList(0, numElements); + double bestSSE = computeTotalSSE(column, bestBreaks); + + for (int iter = 0; iter < 50; iter++) { + List<Integer> breaks = computeBreakpointsLambda(column, lambda); + double totalSSE = computeTotalSSE(column, breaks); + int numSegs = breaks.size() - 1; + + if (totalSSE < bestSSE) { + bestSSE = totalSSE; + bestBreaks = new ArrayList<>(breaks); + } + //target loss reached + if (bestSSE <= sseMax) { + return bestBreaks; + } + + // only one segment left, break condition + if (numSegs <= 1) { + break; + } + // reducing lambda to allow more segments in next iteration + lambda *= 0.8; + } + + return bestBreaks; + } + + /** + * Computes optimal breakpoints, each segment has a SEE plus a + + */ + + public static List<Integer> computeBreakpointsLambda(double[] column, double lambda) { + final int n = column.length; + final double[] costs = new double[n + 1]; // min cost to reach i + final int[] prev = new int[n + 1]; + + Arrays.fill(costs, Double.POSITIVE_INFINITY); + costs[0] = 0.0; + // precompute all segment costs to avoid recomputation in dynamic programming + double[][] segCosts = new double[n+1][n+1]; + for(int i = 0; i < n; i++) { + for(int j = i+1; j <= n; j++) { + segCosts[i][j] = computeSegmentCost(column, i, j); + } + } + // for each point j, find the cheapest previous breakpoint i + for(int j = 1; j <= n; j++) { + for(int i = 0; i < j; i++) { + // cost equals the SSE of segment [i,j] plus penalty plus best costs + double cost = costs[i] + segCosts[i][j] + lambda; + if(cost < costs[j]) { + costs[j] = cost; + prev[j] = i; + } + } + } + + // Backtrack to previous points to recover the breakpoints + List<Integer> breaks = new ArrayList<>(); + int j = n; + while(j > 0) { + breaks.add(j); + j = prev[j]; + } + breaks.add(0); + Collections.reverse(breaks); + return breaks; + } + + /** + * computes the segment cost + * @param column column values + * @param start start index + * @param end end index + * @return SSE of the regression line over the segment + */ + public static double computeSegmentCost(double[] column, int start, int end) { + final int segSize = end - start; + if(segSize <= 1) + return 0.0; + + final double[] ab = regressSegment(column, start, end); + final double slope = ab[0]; + final double intercept = ab[1]; + + double sse = 0.0; + for(int i = start; i < end; i++) { + double err = column[i] - (slope * i + intercept); + sse += err * err; + } + return sse; + } + + /** + * computes the total SSE over all segments defined by the given breakpoints + * @param column + * @param breaks + * @return sum of the total SSE + */ + public static double computeTotalSSE(double[] column, List<Integer> breaks) { + double total = 0.0; + for(int s = 0; s < breaks.size() - 1; s++) { + final int start = breaks.get(s); + final int end = breaks.get(s + 1); + total += computeSegmentCost(column, start, end); + } + return total; + } + + public static double[] regressSegment(double[] column, int start, int end) { + final int numElements = end - start; + if(numElements <= 0) + return new double[] {0.0, 0.0}; + + double sumOfRowIndices = 0, sumOfColumnValues = 0, sumOfRowIndicesSquared = 0, productRowIndexTimesColumnValue = 0; + for(int i = start; i < end; i++) { + sumOfRowIndices += i; + sumOfColumnValues += column[i]; + sumOfRowIndicesSquared += i * i; + productRowIndexTimesColumnValue += i * column[i]; + } + + + final double denominatorForSlope = + numElements * sumOfRowIndicesSquared - sumOfRowIndices * sumOfRowIndices; + final double slope; + final double intercept; + if(denominatorForSlope == 0) { + slope = 0.0; + intercept = sumOfColumnValues / numElements; + } + else { + slope = (numElements * productRowIndexTimesColumnValue - sumOfRowIndices * sumOfColumnValues) / + denominatorForSlope; + intercept = (sumOfColumnValues - slope * sumOfRowIndices) / numElements; + } + return new double[] {slope, intercept}; + } + + /** + * computes breakpoints for a column using a successive algorithm + * extends each segment until the SEE reaches the target loss, then start a new segment + * @param column column values + * @param cs compression setting for setting the target loss + * @return list of breakpoint indices + */ + public static List<Integer> computeBreakpointSuccessive(double[] column, CompressionSettings cs) { + final int numElements = column.length; + final double targetMSE = cs.getPiecewiseTargetLoss(); + if (Double.isNaN(targetMSE) || targetMSE <= 0) { + return Arrays.asList(0, numElements); // fallback single segment + } + + List<Integer> breakpoints = new ArrayList<>(); + breakpoints.add(0); + int currentStart = 0; + + while (currentStart < numElements) { + int bestEnd = -1; // no end found + + for (int end = currentStart + 1; end <= numElements; end++) { + double sse = computeSegmentCost(column, currentStart, end); + if(sse > (end - currentStart) * targetMSE) { + // end-1 is last valid end; if end == segStart+1 force min segment of length 1 + bestEnd = (end == currentStart + 1) ? end : end - 1; + break; + } + } + + if (bestEnd == -1) { + bestEnd = numElements;// all remaining points fitting within budget + } + + // safety guard not allow zero segments + if (bestEnd <= currentStart) { + bestEnd = Math.min(currentStart + 1, numElements); + } + + breakpoints.add(bestEnd); + currentStart = bestEnd; + } + + // make sure, that the last breakpoint equals numElements + int last = breakpoints.get(breakpoints.size() - 1); + if (last != numElements) { + breakpoints.add(numElements); + } + + return breakpoints; + } +} diff --git a/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java b/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java new file mode 100644 index 0000000000..6046bdfb20 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java @@ -0,0 +1,168 @@ +package org.apache.sysds.performance; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.utils.stats.Timing; +import java.util.Random; + +/** + * Performance benchmark for piecewise linear compression. + * Successive is benchmarked across large matrices to show scalability. + * DP is only used as a quality reference on small matrices due to quadratic complexity + + */ +public class PiecewiseLinearCompressionPerformanceTest { + + //different target losses : loose, avg, strict + private static final double[] LOSSES = {1e-1, 1e-2, 1e-4}; + // how often compressed + private static final int REPS = 3; + + /** + * generate of a time series matrix to have a realistic test set up + * @param nr number of rows + * @param nc number of columns + * @return matrix with random generated data + */ + private static MatrixBlock generateTestMatrix(int nr, int nc) { + MatrixBlock mb = new MatrixBlock(nr, nc, true); + mb.allocateDenseBlock(); + Random rng = new Random(42); + for(int c = 0; c < nc; c++) { + double trend = 0.001 * c; + double level = rng.nextDouble() * 5.0; + double volatility = 0.1 + 0.01 * c; + double residual = 0.0; + + for(int row = 0; row < nr; row++) { + // random level shift every 75-150 rows + if(row % (75 + (int)(75 * rng.nextDouble())) == 0) { + level += (rng.nextDouble() - 0.5) * 2.0; + trend += (rng.nextDouble() - 0.5) * 0.0005; + } + // noise: residual = 0.7 * prev + random + residual = 0.7 * residual + rng.nextGaussian() * volatility; + mb.set(row, c, Math.max(0, trend * row + level + residual)); + } + } + return mb; + } + /// returns a average number of segments per column + private static double avgSegments(AColGroup cg) { + int[][] breakpoints = ((ColGroupPiecewiseLinearCompressed) cg).getBreakpointsPerCol(); + int total = 0; + for(int[] bp : breakpoints) total += bp.length - 1; + return total / (double) breakpoints.length; + } + + /** + * computes MSE between the compression, the original data and decompression + * @param orig original matrix + * @param cg piecewise linear compressed column group + * @return MSE + */ + private static double reconstructionMSE(MatrixBlock orig, AColGroup cg) { + int nr = orig.getNumRows(), nc = orig.getNumColumns(); + MatrixBlock recon = new MatrixBlock(nr, nc, false); + recon.allocateDenseBlock(); + cg.decompressToDenseBlock(recon.getDenseBlock(), 0, nr, 0, 0); + double sse = 0; + for(int r = 0; r < nr; r++) + for(int c = 0; c < nc; c++) { + double diff = orig.get(r, c) - recon.get(r, c); + sse += diff * diff; + } + return sse / (nr * nc); + } + + /** + * benchmarks successive compression for a given matrix and target loss + * reports segments, compressed data size, runtime and reconstruction + * @param mb original matrix to compress + * @param loss target loss param + */ + private static void benchmarkSuccessive(MatrixBlock mb, double loss) { + long origSize = mb.getInMemorySize(); + int numRows = mb.getNumRows(), numCol = mb.getNumColumns(); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(loss); + IColIndex colIndexes = ColIndexFactory.create(numCol); + + ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, mb, cs); + + Timing t = new Timing(); + AColGroup cg = null; + t.start(); + for(int i = 0; i < REPS; i++) + cg = ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, mb, cs); + double time = t.stop() / REPS; + + long size = cg.getExactSizeOnDisk(); + String saving = size < origSize + ? String.format("saved %3.0f%%", 100.0 - 100.0 * size / origSize) + : String.format("larger +%.0f%%", 100.0 * size / origSize - 100); + + System.out.printf(" successive loss=%.0e %5.1f segs %6.2f MB (%s) %6.1f ms MSE=%.2e%n", + loss, avgSegments(cg), size / 1e6, saving, time, reconstructionMSE(mb, cg)); + } + + /** + * benchmarks dynamic programming compression for a given matrix and target loss + * no repetition, because DP is too slow due complexity + * reports segments, compressed data size, runtime and reconstruction + * @param mb original matrix to compress + * @param loss target loss param + */ + private static void benchmarkDP(MatrixBlock mb, double loss) { + long origSize = mb.getInMemorySize(); + int numColumns = mb.getNumColumns(); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(loss); + IColIndex colIndexes = ColIndexFactory.create(numColumns); + + Timing t = new Timing(); + t.start(); + AColGroup cg = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, mb, cs); + double time = t.stop(); + + long size = cg.getExactSizeOnDisk(); + String saving = size < origSize + ? String.format("saved %3.0f%%", 100.0 - 100.0 * size / origSize) + : String.format("LARGER +%.0f%%", 100.0 * size / origSize - 100); + + System.out.printf(" DP loss=%.0e %5.1f segs %6.2f MB (%s) %6.1f ms MSE=%.2e%n", + loss, avgSegments(cg), size / 1e6, saving, time, reconstructionMSE(mb, cg)); + } + + public static void main(String[] args) { + System.out.println("=== Piecewise Linear Compression Benchmark ===\n"); + + // Successive scalability across large matrices + System.out.println("=== Successive: scalability ==="); + int[][] configs = {{1000, 10}, {1000, 100}, {1000, 500}, + {5000, 10}, {5000, 100}, {5000, 500}, + {10000, 10}, {10000, 100}, {10000, 500}}; + + for(int[] cfg : configs) { + int nr = cfg[0], nc = cfg[1]; + MatrixBlock mb = generateTestMatrix(nr, nc); + System.out.printf("%nnrows=%d ncols=%d original=%.2f MB%n", + nr, nc, mb.getInMemorySize() / 1e6); + for(double loss : LOSSES) + benchmarkSuccessive(mb, loss); + } + + // DP quality reference on small matrix + System.out.println("\n=== DP: quality reference (nrows=1000, ncols=10) ==="); + MatrixBlock mbSmall = generateTestMatrix(1000, 10); + System.out.printf("original=%.2f MB%n", mbSmall.getInMemorySize() / 1e6); + for(double loss : LOSSES) + benchmarkDP(mbSmall, loss); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java new file mode 100644 index 0000000000..53ae3a1277 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java @@ -0,0 +1,308 @@ +package org.apache.sysds.test.component.compress.colgroup; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.functionobjects.Divide; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.test.AutomatedTestBase; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for ColGroupPiecewiseLinearCompressed operations containing: scalarOperation, binaryRowOps, computeSum, + * containsValue, getIdx, getExactSizeOnDisk. + */ +public class ColGroupPiecewiseLinearCompressedOperationsTest extends AutomatedTestBase { + + private static final long SEED = 42L; + private static final int NROWS = 50; + private static final int NCOLS = 3; + private static final double TARGET_LOSS = 1e-8; + private static final double DELTA = 1e-9; + + private ColGroupPiecewiseLinearCompressed piecewiseLinearColGroup; + private MatrixBlock orignalMB; + private MatrixBlock decompressedMB; + private IColIndex colIndexes; + private int numRows; + private int numCols; + + @Before + public void setUp() { + numRows = NROWS; + numCols = NCOLS; + + /// generate random matrix + double[][] data = getRandomMatrix(numRows, numCols, -3, 3, 1.0, SEED); + orignalMB = DataConverter.convertToMatrixBlock(data); + orignalMB.allocateDenseBlock(); + + colIndexes = ColIndexFactory.create(buildColArray(numCols)); + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(TARGET_LOSS); + + /// create ColGroupPiecewiseLinearCompressed instance + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, orignalMB, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + piecewiseLinearColGroup = (ColGroupPiecewiseLinearCompressed) result; + + /// decompress again + decompressedMB = decompress(piecewiseLinearColGroup); + } + + private MatrixBlock decompress(AColGroup cg) { + MatrixBlock mb = new MatrixBlock(numRows, numCols, false); + mb.allocateDenseBlock(); + cg.decompressToDenseBlock(mb.getDenseBlock(), 0, numRows, 0, 0); + return mb; + } + + /// check elementwise to compare results from compressed and decompressed matrixblock + private void checkMatrixEquals(String msg, MatrixBlock mb1, MatrixBlock mb2) { + if(mb1.getNumRows() != mb2.getNumRows() || mb1.getNumColumns() != mb2.getNumColumns()) + fail(msg + " dimension mismatch"); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + assertEquals(msg + "[" + r + "," + c + "]", mb1.get(r, c), mb2.get(r, c), DELTA); + } + + /// compute column sum to validate + private double[] computeSums(MatrixBlock mb) { + double[] sums = new double[numCols]; + for(int c = 0; c < numCols; c++) + for(int r = 0; r < numRows; r++) + sums[c] += mb.get(r, c); + return sums; + } + + /// create row vector + private double[] buildRowVector() { + double[] v = new double[numCols]; + for(int i = 0; i < numCols; i++) + v[i] = 0.5 * (i + 1); + return v; + } + + private int[] buildColArray(int n) { + int[] cols = new int[n]; + for(int i = 0; i < n; i++) + cols[i] = i; + return cols; + } + + private MatrixBlock applyBinaryRowOpLeft(MatrixBlock mb, BinaryOperator op, double[] v) { + MatrixBlock result = new MatrixBlock(numRows, numCols, false); + result.allocateDenseBlock(); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + result.getDenseBlock().set(r, c, op.fn.execute(v[c], mb.get(r, c))); + return result; + } + + private MatrixBlock applyBinaryRowOpRight(MatrixBlock mb, BinaryOperator op, double[] v) { + MatrixBlock result = new MatrixBlock(numRows, numCols, false); + result.allocateDenseBlock(); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + result.getDenseBlock().set(r, c, op.fn.execute(mb.get(r, c), v[c])); + return result; + } + + @Test + public void testComputeSum() { + double[] sumsComp = new double[numCols]; + piecewiseLinearColGroup.computeSum(sumsComp, numRows); + assertArrayEquals(sumsComp, computeSums(decompressedMB), DELTA); + } + + @Test + public void testComputeColSums() { + double[] sumsComp = new double[numCols]; + piecewiseLinearColGroup.computeColSums(sumsComp, numRows); + assertArrayEquals(sumsComp, computeSums(decompressedMB), DELTA); + } + + private void testScalarOp(ScalarOperator op, double scalar) { + MatrixBlock expected = new MatrixBlock(numRows, numCols, false); + expected.allocateDenseBlock(); + for(int r = 0; r < numRows; r++) + for(int c = 0; c < numCols; c++) + expected.getDenseBlock().set(r, c, op.fn.execute(decompressedMB.get(r, c), scalar)); + + checkMatrixEquals("scalarOp " + op.fn.getClass().getSimpleName(), expected, + decompress(piecewiseLinearColGroup.scalarOperation(op))); + } + + @Test + public void testScalarPlus() { + testScalarOp(new RightScalarOperator(Plus.getPlusFnObject(), 3.7), 3.7); + } + + @Test + public void testScalarMinus() { + testScalarOp(new RightScalarOperator(Minus.getMinusFnObject(), 1.5), 1.5); + } + + @Test + public void testScalarMultiply() { + testScalarOp(new RightScalarOperator(Multiply.getMultiplyFnObject(), 2.0), 2.0); + } + + @Test + public void testScalarDivide() { + testScalarOp(new RightScalarOperator(Divide.getDivideFnObject(), 4.0), 4.0); + } + + @Test + public void testBinaryRowOpLeftPlus() { + BinaryOperator op = new BinaryOperator(Plus.getPlusFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpLeft Plus", applyBinaryRowOpLeft(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpLeft(op, v, false))); + } + + @Test + public void testBinaryRowOpLeftMultiply() { + BinaryOperator op = new BinaryOperator(Multiply.getMultiplyFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpLeft Multiply", applyBinaryRowOpLeft(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpLeft(op, v, false))); + } + + @Test + public void testBinaryRowOpRightMinus() { + BinaryOperator op = new BinaryOperator(Minus.getMinusFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpRight Minus", applyBinaryRowOpRight(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpRight(op, v, false))); + } + + @Test + public void testBinaryRowOpRightDivide() { + BinaryOperator op = new BinaryOperator(Divide.getDivideFnObject()); + double[] v = buildRowVector(); + checkMatrixEquals("binaryRowOpRight Divide", applyBinaryRowOpRight(decompressedMB, op, v), + decompress(piecewiseLinearColGroup.binaryRowOpRight(op, v, false))); + } + + @Test + public void testContainsValueIntercept() { + double pattern = piecewiseLinearColGroup.getInterceptsPerCol()[0][0]; + assertTrue("intercept of col 0 seg 0 should exist", piecewiseLinearColGroup.containsValue(pattern)); + } + + @Test + public void testContainsValueEndpoint() { + int[] breakpoints = piecewiseLinearColGroup.getBreakpointsPerCol()[0]; + double[] intercepts = piecewiseLinearColGroup.getInterceptsPerCol()[0]; + double[] slopes = piecewiseLinearColGroup.getSlopesPerCol()[0]; + if(breakpoints.length > 1) { + double pattern = intercepts[0] + slopes[0] * (breakpoints[1] - breakpoints[0] - 1); + assertTrue("endpoint of col 0 seg 0 should exist", piecewiseLinearColGroup.containsValue(pattern)); + } + } + + @Test + public void testContainsValueConstantSegment() { + ColGroupPiecewiseLinearCompressed cg = (ColGroupPiecewiseLinearCompressed) ColGroupPiecewiseLinearCompressed.create( + ColIndexFactory.create(new int[] {0}), new int[][] {{0, numRows}}, new double[][] {{0.0}}, + new double[][] {{1.23}}, numRows); + + assertTrue("constant value 1.23 should exist", cg.containsValue(1.23)); + assertFalse("value 2.0 should not exist", cg.containsValue(2.0)); + } + + @Test + public void testContainsValueOutsideRange() { + assertFalse("value -10 outside data range", piecewiseLinearColGroup.containsValue(-10.0)); + assertFalse("value +10 outside data range", piecewiseLinearColGroup.containsValue(10.0)); + } + + @Test + public void testGetIdxMatchesDecompress() { + for(int c = 0; c < numCols; c++) + for(int r = 0; r < numRows; r++) + assertEquals("getIdx(" + r + "," + c + ")", decompressedMB.get(r, c), + piecewiseLinearColGroup.getIdx(r, c), 1e-10); + } + + @Test + public void testGetIdxInvalidBounds() { + assertEquals("row < 0", 0.0, piecewiseLinearColGroup.getIdx(-1, 0), DELTA); + assertEquals("row >= numRows", 0.0, piecewiseLinearColGroup.getIdx(numRows, 0), DELTA); + assertEquals("col < 0", 0.0, piecewiseLinearColGroup.getIdx(0, -1), DELTA); + assertEquals("col >= ncols", 0.0, piecewiseLinearColGroup.getIdx(0, numCols), DELTA); + } + + @Test + public void testGetNumValues() { + int expected = 0; + for(int c = 0; c < numCols; c++) { + int breakpointsLen = piecewiseLinearColGroup.getBreakpointsPerCol()[c].length; + int slopesLen = piecewiseLinearColGroup.getSlopesPerCol()[c].length; + int interceptsLen = piecewiseLinearColGroup.getInterceptsPerCol()[c].length; + assertEquals("breakpoints != slopes+1 for col " + c, breakpointsLen, slopesLen + 1); + assertEquals("slopes != intercepts for col " + c, slopesLen, interceptsLen); + expected += breakpointsLen + slopesLen + interceptsLen; + } + assertEquals("getNumValues() mismatch", expected, piecewiseLinearColGroup.getNumValues()); + } + + @Test + public void testGetExactSizeOnDisk() { + Random rng = new Random(SEED); + int rows = 80 + rng.nextInt(40); + int numSegs = 1 + rng.nextInt(3); + + int[] breakpoints = new int[numSegs + 1]; + breakpoints[0] = 0; + breakpoints[numSegs] = rows; + for(int s = 1; s < numSegs; s++) + breakpoints[s] = rng.nextInt(rows * 2 / 3) + rows / 10; + + double[] slopes = new double[numSegs]; + double[] intercepts = new double[numSegs]; + for(int s = 0; s < numSegs; s++) { + slopes[s] = rng.nextDouble() * 4 - 2; + intercepts[s] = rng.nextDouble() * 4 - 2; + } + /// PLC Piecewise Linear Compressed + AColGroup colGroupPLC = ColGroupPiecewiseLinearCompressed.create( + ColIndexFactory.create(new int[] {rng.nextInt(20)}), new int[][] {breakpoints}, new double[][] {slopes}, + new double[][] {intercepts}, rows); + + assertTrue("disk size should be positive", colGroupPLC.getExactSizeOnDisk() > 0); + assertTrue("num values should be positive", colGroupPLC.getNumValues() > 0); + } + + @Override + public double[][] getRandomMatrix(int rows, int cols, double min, double max, double sparsity, long seed) { + Random rng = new Random(seed); + double[][] data = new double[rows][cols]; + for(int r = 0; r < rows; r++) + for(int c = 0; c < cols; c++) + data[r][c] = min + rng.nextDouble() * (max - min); + return data; + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java new file mode 100644 index 0000000000..e05745bc97 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java @@ -0,0 +1,455 @@ +package org.apache.sysds.test.component.compress.colgroup; + +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed; +import org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.compress.estim.EstimationFactors; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.test.AutomatedTestBase; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * Unit tests of ColGroupPiecewiseLinearCompression Covers Validation, Compression and decompression + */ +public class ColGroupPiecewiseLinearCompressedTest extends AutomatedTestBase { + + private static final long SEED = 42L; + + @Override + public void setUp() { + + } + + @Test(expected = NullPointerException.class) + public void testCreateNullBreakpoints() { + IColIndex cols = ColIndexFactory.create(new int[] {0}); + int[][] nullBp = {null}; + ColGroupPiecewiseLinearCompressed.create(cols, nullBp, new double[][] {{1.0}}, new double[][] {{0.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateTooFewBreakpoints() { + int[][] singleBp = {new int[] {0}}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), singleBp, + new double[][] {new double[] {1.0}}, new double[][] {new double[] {0.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateInconsistentSlopes() { + int[] bp = {0, 5, 10}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), new int[][] {bp}, + new double[][] {new double[] {1.0, 2.0, 3.0}}, new double[][] {new double[] {0.0, 1.0}}, 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateInconsistentIntercepts() { + int[] bp = {0, 5, 10}; + ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), new int[][] {bp}, + new double[][] {new double[] {1.0, 2.0}}, new double[][] {new double[] {0.0}}, 10); + } + + @Test + public void testCompressAndDecompressDP() { + + // create random matrix + final int nrows = 50, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, SEED); + MatrixBlock in = DataConverter.convertToMatrixBlock(data); + in.allocateDenseBlock(); + + IColIndex colIndexes = ColIndexFactory.create(new int[] {0, 1, 2}); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-8); + + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, in, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + // check the structure + int[][] breakpoints = plGroup.getBreakpointsPerCol(); + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + + assertEquals("wrong number of columns in breakpoints", ncols, breakpoints.length); + for(int c = 0; c < ncols; c++) { + assertTrue("breakpoints[" + c + "] needs at least 2 entries", breakpoints[c].length >= 2); + assertEquals("breakpoints[" + c + "] must start at 0", 0, breakpoints[c][0]); + assertEquals("breakpoints[" + c + "] must end at nrows", nrows, breakpoints[c][breakpoints[c].length - 1]); + int numSegs = breakpoints[c].length - 1; + assertEquals("slopes[" + c + "] length mismatch", numSegs, slopes[c].length); + assertEquals("intercepts[" + c + "] length mismatch", numSegs, intercepts[c].length); + } + + // decompress and check reconstruction of column group + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + DenseBlock db = recon.getDenseBlock(); + + for(int r = 0; r < nrows; r++) { + for(int c = 0; c < ncols; c++) { + double val = db.get(r, c); + assertFalse("NaN at [" + r + "," + c + "]", Double.isNaN(val)); + assertFalse("Infinite at [" + r + "," + c + "]", Double.isInfinite(val)); + assertEquals("reconstruction error too large at [" + r + "," + c + "]", data[r][c], val, 1e-6); + } + } + } + + @Test + public void testCompressAndDecompressSuccessive() { + + //create random matrix + final int nrows = 50, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, SEED); + MatrixBlock in = DataConverter.convertToMatrixBlock(data); + in.allocateDenseBlock(); + + IColIndex colIndexes = ColIndexFactory.create(new int[] {0, 1, 2}); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-8); + + // create ColGroupPiecewiseLinearCompressed with successive compression + AColGroup result = ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, in, cs); + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + // structure checks + int[][] bp = plGroup.getBreakpointsPerCol(); + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + + assertEquals("wrong number of columns in bp", ncols, bp.length); + for(int c = 0; c < ncols; c++) { + assertTrue("bp[" + c + "] needs at least 2 entries", bp[c].length >= 2); + assertEquals("bp[" + c + "] must start at 0", 0, bp[c][0]); + assertEquals("bp[" + c + "] must end at nrows", nrows, bp[c][bp[c].length - 1]); + int numSegs = bp[c].length - 1; + assertEquals("slopes[" + c + "] length mismatch", numSegs, slopes[c].length); + assertEquals("intercepts[" + c + "] length mismatch", numSegs, intercepts[c].length); + } + + // validate decompression + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + DenseBlock db = recon.getDenseBlock(); + + for(int r = 0; r < nrows; r++) { + for(int c = 0; c < ncols; c++) { + double val = db.get(r, c); + assertFalse("NaN at [" + r + "," + c + "]", Double.isNaN(val)); + assertFalse("Infinite at [" + r + "," + c + "]", Double.isInfinite(val)); + assertEquals("reconstruction error too large at [" + r + "," + c + "]", data[r][c], val, 1e-6); + } + } + } + + /// Wrapper-Classes: Test setup for DP and successive compression + + private void testRoundtripDP(double[][] data, int nrows, int ncols, double targetLoss, double tolerance, + int maxFailures) { + testRoundtrip(data, nrows, ncols, targetLoss, tolerance, maxFailures, false); + } + + private void testRoundtripSuccessive(double[][] data, int nrows, int ncols, double targetLoss, double tolerance, + int maxFailures) { + testRoundtrip(data, nrows, ncols, targetLoss, tolerance, maxFailures, true); + } + + /** + * Set test setup: converting data in matrix block, set compression setting does compression, decompression, + * validation + */ + private void testRoundtrip(double[][] data, int nrows, int ncols, double targetLoss, double tolerance, + int maxFailures, boolean successive) { + + ///create a matrix + MatrixBlock orig = DataConverter.convertToMatrixBlock(data); + orig.allocateDenseBlock(); + + IColIndex colIndexes = ColIndexFactory.create(buildColArray(ncols)); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(targetLoss); + + /// choose compression + AColGroup result = successive ? ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, orig, + cs) : ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, orig, cs); + + assertTrue(result instanceof ColGroupPiecewiseLinearCompressed); + ColGroupPiecewiseLinearCompressed plGroup = (ColGroupPiecewiseLinearCompressed) result; + + /// structure checks + checkStructure(plGroup, nrows, ncols); + + /// decompression check + MatrixBlock recon = new MatrixBlock(nrows, ncols, false); + recon.allocateDenseBlock(); + plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 0, 0); + DenseBlock db = recon.getDenseBlock(); + + int failures = 0; + for(int r = 0; r < nrows; r++) { + for(int c = 0; c < ncols; c++) { + double val = db.get(r, c); + assertFalse("NaN at [" + r + "," + c + "]", Double.isNaN(val)); + assertFalse("Infinite at [" + r + "," + c + "]", Double.isInfinite(val)); + if(Math.abs(data[r][c] - val) > tolerance) + failures++; + } + } + assertTrue("too many reconstruction failures: " + failures, failures <= maxFailures); + } + + private void checkStructure(ColGroupPiecewiseLinearCompressed plGroup, int nrows, int ncols) { + int[][] breakpoints = plGroup.getBreakpointsPerCol(); + double[][] slopes = plGroup.getSlopesPerCol(); + double[][] intercepts = plGroup.getInterceptsPerCol(); + + assertEquals("wrong number of columns in breakpoints", ncols, breakpoints.length); + assertEquals("wrong number of col indices", ncols, plGroup.getColIndices().size()); + + for(int c = 0; c < ncols; c++) { + assertTrue("breakpoints[" + c + "] needs at least 2 entries", breakpoints[c].length >= 2); + assertEquals("breakpoints[" + c + "] must start at 0", 0, breakpoints[c][0]); + assertEquals("breakpoints[" + c + "] must end at nrows", nrows, breakpoints[c][breakpoints[c].length - 1]); + int numSegs = breakpoints[c].length - 1; + assertEquals("slopes[" + c + "] length mismatch", numSegs, slopes[c].length); + assertEquals("intercepts[" + c + "] length mismatch", numSegs, intercepts[c].length); + } + } + + private double[][] buildMultiSegmentData(int nrows, int ncols) { + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + int[] segStarts = {0, 15, 30, 45, 60}; + double[] slopes = {0.5, -1.2, 2.0, -0.8}; + + for(int c = 0; c < ncols; c++) { + double offset = c; + for(int r = 0; r < nrows; r++) { + int seg = 0; + while(seg < segStarts.length - 1 && r >= segStarts[seg + 1]) + seg++; + data[r][c] = slopes[seg] * (r - segStarts[seg]) + offset + rng.nextGaussian() * 0.8; + offset += 0.01; + } + } + return data; + } + + private int[] buildColArray(int ncols) { + int[] cols = new int[ncols]; + for(int i = 0; i < ncols; i++) + cols[i] = i; + return cols; + } + + @Test + public void testTrendWithNoise() { + final int nrows = 100, ncols = 2; + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + for(int r = 0; r < nrows; r++) { + double trend = 0.05 * r; + for(int c = 0; c < ncols; c++) + data[r][c] = trend + rng.nextGaussian() * 1.5 + c * 2.0; + } + testRoundtripDP(data, nrows, ncols, 1.0, 4.0, 45); + testRoundtripSuccessive(data, nrows, ncols, 1.0, 4.0, 45); + } + + @Test + public void testAbruptJumps() { + final int nrows = 80, ncols = 3; + double[][] data = getRandomMatrix(nrows, ncols, -2, 2, 1.0, SEED); + for(int c = 0; c < ncols; c++) { + for(int r = 25; r < 55; r++) + data[r][c] += 8.0; + for(int r = 55; r < nrows; r++) + data[r][c] += 15.0; + } + // successive needs looser tolerance on jumps + testRoundtripDP(data, nrows, ncols, 5.0, 10.0, 50); + testRoundtripSuccessive(data, nrows, ncols, 25.0, 18.0, 55); + } + + @Test + public void testHighFrequency() { + final int nrows = 100, ncols = 50; + Random rng = new Random(SEED); + double[][] data = new double[nrows][ncols]; + for(int r = 0; r < nrows; r++) { + double sine = Math.sin(r * 0.4) * 4.0; + for(int c = 0; c < ncols; c++) + data[r][c] = sine + rng.nextGaussian() * 0.8 + Math.sin(r * 0.2 + c) * 2.0; + } + // both struggle with high frequency; successive slightly worse + testRoundtripDP(data, nrows, ncols, 2.0, 2.0, 3500); + testRoundtripSuccessive(data, nrows, ncols, 2.0, 2.5, 2500); + } + + @Test + public void testLowVarianceSingleColumn() { + double[][] data = getRandomMatrix(50, 1, -1, 1, 0.3, SEED); + testRoundtripDP(data, 50, 1, 0.1, 0.5, 5); + testRoundtripSuccessive(data, 50, 1, 0.05, 0.4, 3); + } + + @Test + public void testSingleColumn() { + double[][] data = getRandomMatrix(50, 1, -1, 1, 1.0, SEED); + testRoundtripDP(data, 50, 1, 0.5, 1.0, 8); + testRoundtripSuccessive(data, 50, 1, 0.5, 1.0, 8); + } + + @Test + public void testKnownSegmentBoundaries() { + final int nrows = 60, ncols = 2; + double[][] data = buildMultiSegmentData(nrows, ncols); + // successive needs slightly higher targetLoss for same data + testRoundtripDP(data, nrows, ncols, 0.8, 5.0, 35); + testRoundtripSuccessive(data, nrows, ncols, 1.0, 5.0, 35); + } + + @Test + public void testMultipleColumns() { + double[][] data = getRandomMatrix(80, 5, -5, 5, 1.5, SEED); + testRoundtripDP(data, 80, 5, 3.0, 4.0, 120); + testRoundtripSuccessive(data, 80, 5, 3.0, 4.0, 120); + } + + private CompressedSizeInfo createTestCompressedSizeInfo() { + IColIndex cols = ColIndexFactory.create(new int[] {0}); + EstimationFactors facts = new EstimationFactors(2, 10); + + CompressedSizeInfoColGroup info = new CompressedSizeInfoColGroup(cols, facts, + AColGroup.CompressionType.PiecewiseLinear); + + List<CompressedSizeInfoColGroup> infos = Arrays.asList(info); + CompressedSizeInfo csi = new CompressedSizeInfo(infos); + + return csi; + } + + @Test + public void testCompressPiecewiseLinearViaRealAPI() { + + MatrixBlock in = new MatrixBlock(10, 1, false); + in.allocateDenseBlock(); + for(int r = 0; r < 10; r++) { + in.set(r, 0, r * 0.5); + } + + CompressionSettings cs = new CompressionSettingsBuilder().addValidCompression( + AColGroup.CompressionType.PiecewiseLinear).create(); + + CompressedSizeInfo csi = createTestCompressedSizeInfo(); + + List<AColGroup> colGroups = ColGroupFactory.compressColGroups(in, csi, cs); + + boolean hasPiecewise = colGroups.stream().anyMatch(cg -> cg instanceof ColGroupPiecewiseLinearCompressed); + assertTrue(hasPiecewise); + } + + @Test + public void testSuccessiveLinearColumnSingleSegment() { + double[] col = {1.0, 2.0, 3.0, 4.0, 5.0}; + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1e-6); + + List<Integer> breaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs); + assertEquals("[0, 5]", breaks.toString()); + } + + @Test + public void testSuccessiveNoisyColumnMultipleSegments() { + double[] col = {1.1, 1.9, 2.2, 10.1, 10.8, 11.3}; + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.0); + + List<Integer> breaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs); + assertTrue("expected at least 3 breakpoints", breaks.size() >= 3); + } + + @Test + public void testSuccessiveStrictLossProducesMoreSegments() { + double[] col = {1, 2, 3, 10, 11, 12, 20, 21, 22}; + + CompressionSettings strict = new CompressionSettingsBuilder().create(); + strict.setPiecewiseTargetLoss(0.01); + + CompressionSettings loose = new CompressionSettingsBuilder().create(); + loose.setPiecewiseTargetLoss(10.0); + + List<Integer> strictBreaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, strict); + List<Integer> looseBreaks = PiecewiseLinearUtils.computeBreakpointSuccessive(col, loose); + + assertTrue("strict loss should produce more segments", strictBreaks.size() > looseBreaks.size()); + } + + @Test + public void testSuccessiveBreakpointDetectedAtJump() { + double[] col = getRandomColumn(30, SEED); + for(int r = 10; r < 20; r++) + col[r] += 8.0; + + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(2.0); + + int[] bps = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs).stream().mapToInt(Integer::intValue) + .toArray(); + + assertTrue("expected at least 3 segments", bps.length >= 3); + assertTrue("expected breakpoint near jump [10,20]", hasBreakInRange(bps, 8, 22)); + } + + @Test + public void testSuccessiveGlobalMSEWithinTarget() { + double[] col = getRandomColumn(40, SEED + 1); + CompressionSettings cs = new CompressionSettingsBuilder().create(); + cs.setPiecewiseTargetLoss(1.5); + + List<Integer> bps = PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs); + double sse = 0.0; + for(int i = 0; i < bps.size() - 1; i++) + sse += PiecewiseLinearUtils.computeSegmentCost(col, bps.get(i), bps.get(i + 1)); + + double mse = sse / col.length; + assertTrue("global MSE=" + mse + " exceeds target=" + cs.getPiecewiseTargetLoss(), + mse <= cs.getPiecewiseTargetLoss() + 1e-10); + } + + private boolean hasBreakInRange(int[] bps, int min, int max) { + for(int i = 1; i < bps.length - 1; i++) + if(bps[i] >= min && bps[i] <= max) + return true; + return false; + } + + private double[] getRandomColumn(int len, long seed) { + Random rng = new Random(seed); + double[] col = new double[len]; + for(int i = 0; i < len; i++) + col[i] = rng.nextGaussian() * 2 + i * 0.01; + return col; + } + +} + +
