janniklinde commented on code in PR #2420:
URL: https://github.com/apache/systemds/pull/2420#discussion_r2745520119
##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ColGroupPiecewiseLinearCompressed.java:
##########
Review Comment:
Move this class from `colgroup/scheme` package to `colgroup/`.
In general, all methods that are currently unimplemented should `throw new
NotImplementedException()`
##########
bin/systemds-standalone.sh:
##########
Review Comment:
This file should not be part of the PR. You can keep it locally but you
should untrack it and not add it to your commits. You could use `git rm
--cached bin/systemds-standalone.sh`.
##########
src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java:
##########
@@ -135,6 +136,21 @@ public class CompressionSettings {
public final boolean preferDeltaEncoding;
+ /**
+ * Ziel-Gesantverlust für piecewise Lineace Komocession•
Interpretation: maximal entaubter Alobaler MSE pro Went in
+ * der Sealte. O.O ~ quasi verlustfrei, viele Segmente >0 ~ mehr
Approximation entaubt, weniger Segmente
+ */
Review Comment:
Weird comment
##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java:
##########
Review Comment:
It seems like you reformatted the file to revert the tabs -> spaces
conversion, which is good. However, there are still many unnecessary changes. I
would recommend you revert that file to the original state of this repository
and then only add the `enum CompressionType PiecewiseLinear`
##########
pom.xml:
##########
Review Comment:
Please revert that file. You may keep your changes locally but these changes
should not be pushed
##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java:
##########
@@ -1066,6 +1074,180 @@ private static AColGroup
compressLinearFunctional(IColIndex colIndexes, MatrixBl
return ColGroupLinearFunctional.create(colIndexes,
coefficients, numRows);
}
+ public static AColGroup compressPiecewiseLinearFunctional(IColIndex
colIndexes, MatrixBlock in,
+ CompressionSettings cs) {
+
+ //Erstmal den Inhalt einer Spalte speichern
+
+ int numRows = in.getNumRows();
+ int colIdx = colIndexes.get(0); //Die erste Spalte
Review Comment:
You take the first column, which is fine for now, but in a finished
implementation you would either repeat compression on every column or do a
multidimensional regression, where you treat a 'row' of all indices as a vector.
##########
src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java:
##########
Review Comment:
Here please revert the file. Did you change anything in this file (except
tabs->spaces which you should be reverted)?
You might consider creating a variable `double targetLoss` and a method
`public CompressionSettingsBuilder setTargetLoss(double loss) {...}`. If you
then add the `targetLoss` as a variable in the `CompressionSettings`
constructor, you directly set the target loss via `CompressionSettingsBuilder`
##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java:
##########
@@ -1066,6 +1074,180 @@ private static AColGroup
compressLinearFunctional(IColIndex colIndexes, MatrixBl
return ColGroupLinearFunctional.create(colIndexes,
coefficients, numRows);
}
+ public static AColGroup compressPiecewiseLinearFunctional(IColIndex
colIndexes, MatrixBlock in,
+ CompressionSettings cs) {
+
+ //Erstmal den Inhalt einer Spalte speichern
+
+ int numRows = in.getNumRows();
+ int colIdx = colIndexes.get(0); //Die erste Spalte
+ double[] column = getColumn(in, colIdx);
+
+ //Sette den Targetloss
+
+ // Breakpoints bestimmen: Einteilung der Segmente
+
+ List<Integer> breakpointsList = computeBreakpoints(cs, column);
+ int[] breakpoints =
breakpointsList.stream().mapToInt(Integer::intValue).toArray();
+ //Für jedes Segment lineare Regression als kompressionsverfahren
+
+ // 3) Pro Segment Regression -> a,b
+ int numSeg = breakpoints.length - 1;
+ double[] slopes = new double[numSeg];
+ double[] intercepts = new double[numSeg];
+
+ for(int s = 0; s < numSeg; s++) {
+ int start = breakpoints[s];
+ int end = breakpoints[s + 1];
+
+ double[] ab = regressSegment(column, start, end); //
nutzt gleiche Stats wie computeSegmentCost
+ slopes[s] = ab[0];
+ intercepts[s] = ab[1];
+ }
+ //Erstelle die Datenstruktur: PiecewiseLinearColGroupCompressed
+
+ return ColGroupPiecewiseLinearCompressed.create(colIndexes,
breakpoints, slopes, intercepts, numRows);
+ }
+
+ public static double[] getColumn(MatrixBlock in, int colIndex) {
+ int numRows = in.getNumRows(); // Anzahl der Zeilen
[web:16]
+ double[] column = new double[numRows]; // Variable für die
Spalte
+
+ for(int r = 0; r < numRows; r++) {
+ column[r] = in.get(r, colIndex); // Wert (r, colIndex)
lesen [web:16][web:25]
+ }
+ return column;
+ }
+
+ public static List<Integer> computeBreakpoints(CompressionSettings cs,
double[] column) {
+ int n = column.length;
+ double targetMSE = cs.getPiecewiseTargetLoss();
+ // Fall A: kein TargetLoss angegeben -> einfache Variante mit
fixem λ
+ if(Double.isNaN(targetMSE) || targetMSE <= 0) {
+ double lambda = 5.0;
+ return computeBreakpointsLambda(column, lambda);
+ }
+
+ // Fall B: TargetLoss gesetzt -> globales Fehlerbudget
berücksichtigen
+ double sseMax = n * targetMSE; // MSE -> SSE-Budget
+
+ double lambdaMin = 0.0; // viele Segmente, minimaler Fehler
+ double lambdaMax = 1e6; // wenige Segmente, mehr Fehler
+
+ List<Integer> bestBreaks = null;
+
+ for(int it = 0; it < 20; it++) { // Binärsuche auf λ
+ double lambda = 0.5 * (lambdaMin + lambdaMax);
+
+ List<Integer> breaks = computeBreakpointsLambda(column,
lambda);
+ double totalSSE = computeTotalSSE(column, breaks);
+
+ if(totalSSE <= sseMax) {
+ // Budget eingehalten: wir können versuchen,
mit größerem λ noch weniger Segmente zu nehmen
+ bestBreaks = breaks;
+ lambdaMin = lambda;
+ }
+ else {
+ // Fehler zu groß: λ verkleinern, mehr Segmente
zulassen
+ lambdaMax = lambda;
+ }
+ }
+
+ if(bestBreaks == null)
+ bestBreaks = computeBreakpointsLambda(column,
lambdaMin);
+
+ return bestBreaks;
+ }
+
+ public static List<Integer> computeBreakpointsLambda(double[] column,
double lambda) {
+ int sizeColumn = column.length;
+ double[] dp = new double[sizeColumn + 1];
+ int[] prev = new int[sizeColumn + 1];
+
+ dp[0] = 0.0;
+
+ for(int index = 1; index <= sizeColumn; index++) {
+ dp[index] = Double.POSITIVE_INFINITY;
+ for(int i = 0; i < index; i++) { // Segment [i, index)
+ double costCurrentSegment =
computeSegmentCost(column, i, index); // SSE
+ double candidateCost = dp[i] +
costCurrentSegment + lambda;
+ if(candidateCost < dp[index]) {
+ dp[index] = candidateCost;
+ prev[index] = i;
+ }
+ }
+ }
+
+ List<Integer> segmentLimits = new ArrayList<>();
+ int breakpointIndex = sizeColumn;
+ while(breakpointIndex > 0) {
+ segmentLimits.add(breakpointIndex);
+ breakpointIndex = prev[breakpointIndex];
+ }
+ segmentLimits.add(0);
+ Collections.sort(segmentLimits);
+ return segmentLimits;
+ }
+
+ public static double computeSegmentCost(double[] column, int start, int
end) {
+ int n = end - start;
+ if(n <= 1)
+ return 0.0;
+
+ double[] ab = regressSegment(column, start, end);
+ double slope = ab[0];
+ double intercept = ab[1];
+
+ double sse = 0.0;
+ for(int i = start; i < end; i++) {
+ double x = i;
+ double y = column[i];
+ double yhat = slope * x + intercept;
+ double diff = y - yhat;
+ sse += diff * diff;
+ }
+ return sse; // oder sse / n als MSE
+ }
+
+ public static double computeTotalSSE(double[] column, List<Integer>
breaks) {
+ double total = 0.0;
+ for(int s = 0; s < breaks.size() - 1; s++) {
+ int start = breaks.get(s);
+ int end = breaks.get(s + 1);
+ total += computeSegmentCost(column, start, end); // SSE
des Segments
+ }
+ return total;
+ }
+
+ public static double[] regressSegment(double[] column, int start, int
end) {
+ int n = end - start;
+ if(n <= 0)
+ return new double[] {0.0, 0.0};
+
+ double sumX = 0, sumY = 0, sumXX = 0, sumXY = 0;
+ for(int i = start; i < end; i++) {
+ double x = i;
+ double y = column[i];
+ sumX += x;
+ sumY += y;
+ sumXX += x * x;
+ sumXY += x * y;
+ }
+
+ double nD = n;
+ double denom = nD * sumXX - sumX * sumX;
+ double slope, intercept;
+ if(denom == 0) {
+ slope = 0.0;
+ intercept = sumY / nD;
+ }
+ else {
+ slope = (nD * sumXY - sumX * sumY) / denom;
+ intercept = (sumY - slope * sumX) / nD;
+ }
+ return new double[] {slope, intercept};
+ }
+
Review Comment:
To keep this file clean, I recommend that you create a new class called
`PiecewiseLinearUtils` in the package `functional`. Your
`compressPiecewiseLinearFunctional(...)` then just calls
`PiecewiseLinearUtils.compressSegmentedLeastSquares(...)`.
##########
use-java17-systemds.sh:
##########
Review Comment:
Please remove that file from the PR.
##########
src/test/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java:
##########
Review Comment:
There should be no underscores in method names.
Move this test file to `test/component/compress/colgroup`.
You have a lot of isolated tests (which also look like autogenerated tests
and not handwritten). It would be nice to have more tests. Please remove some
redundant ones, and add tests on randomly generated data (with a fixed seed)
where you create a `ColGroupPiecewiseLinearCompressed` and then
`decompressToDenseBlock`. You then compare it to the original data and compute
a loss (which should be no more than some upper bound).
##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ColGroupPiecewiseLinearCompressed.java:
##########
@@ -0,0 +1,392 @@
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.AColGroupCompressed;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.instructions.cp.CmCovObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.matrix.operators.CMOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+
+import java.util.Arrays;
+
+public class ColGroupPiecewiseLinearCompressed extends AColGroupCompressed {
+
+ IColIndex colIndexes;
+ int[] breakpoints;
+ double[] slopes;
+ double[] intercepts;
+ int numRows;
+
+ protected ColGroupPiecewiseLinearCompressed(IColIndex colIndices) {
+ super(colIndices);
+ }
+
+ public ColGroupPiecewiseLinearCompressed(IColIndex colIndexes, int[]
breakpoints, double[] slopes,
+ double[] intercepts, int numRows) {
+ super(colIndexes);
+ this.breakpoints = breakpoints;
+ this.slopes = slopes;
+ this.intercepts = intercepts;
+ this.numRows = numRows;
+ }
+
+ public static AColGroup create(IColIndex colIndexes, int[] breakpoints,
double[] slopes, double[] intercepts,
+ int numRows) {
+ if(breakpoints == null || breakpoints.length < 2)
+ throw new IllegalArgumentException("Need at least one
segment");
+
+ int numSeg = breakpoints.length - 1;
+ if(slopes.length != numSeg || intercepts.length != numSeg)
+ throw new IllegalArgumentException("Inconsistent
segment arrays");
+
+ int[] bpCopy = Arrays.copyOf(breakpoints, breakpoints.length);
+ double[] slopeCopy = Arrays.copyOf(slopes, slopes.length);
+ double[] interceptCopy = Arrays.copyOf(intercepts,
intercepts.length);
+
+ return new ColGroupPiecewiseLinearCompressed(colIndexes,
bpCopy, slopeCopy, interceptCopy, numRows);
+
+ }
+
+ @Override
+ public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int
offR, int offC) {
+
+ if(db == null || _colIndexes == null || _colIndexes.size() == 0
|| breakpoints == null || slopes == null ||
+ intercepts == null) {
+ return;
+ }
+
+ int numSeg = breakpoints.length - 1;
+ if(numSeg <= 0 || rl >= ru) {
+ return;
+ }
+
+ final int col = _colIndexes.get(0);
+
+ for(int s = 0; s < numSeg; s++) {
+ int segStart = breakpoints[s];
+ int segEnd = breakpoints[s + 1];
+ if(segStart >= segEnd)
+ continue; // Invalid Segment
+
+ double a = slopes[s];
+ double b = intercepts[s];
+
+ int rs = Math.max(segStart, rl);
+ int re = Math.min(segEnd, ru);
+ if(rs >= re)
+ continue;
+
+ for(int r = rs; r < re; r++) {
+ double yhat = a * r + b;
+ int gr = offR + r;
+ int gc = offC + col;
+
+ if(gr >= 0 && gr < db.numRows() && gc >= 0 &&
gc < db.numCols()) {
+ db.set(gr, gc, yhat);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected double computeMxx(double c, Builtin builtin) {
+ return 0;
+ }
+
+ @Override
+ protected void computeColMxx(double[] c, Builtin builtin) {
+
+ }
+
+ @Override
+ protected void computeSum(double[] c, int nRows) {
+
+ }
+
+ @Override
+ protected void computeSumSq(double[] c, int nRows) {
+
+ }
+
+ @Override
+ protected void computeColSumsSq(double[] c, int nRows) {
+
+ }
+
+ @Override
+ protected void computeRowSums(double[] c, int rl, int ru, double[]
preAgg) {
+
+ }
+
+ @Override
+ protected void computeRowMxx(double[] c, Builtin builtin, int rl, int
ru, double[] preAgg) {
+
+ }
+
+ @Override
+ protected void computeProduct(double[] c, int nRows) {
+
+ }
+
+ @Override
+ protected void computeRowProduct(double[] c, int rl, int ru, double[]
preAgg) {
+
+ }
+
+ @Override
+ protected void computeColProduct(double[] c, int nRows) {
+
+ }
+
+ @Override
+ protected double[] preAggSumRows() {
+ return new double[0];
+ }
+
+ @Override
+ protected double[] preAggSumSqRows() {
+ return new double[0];
+ }
+
+ @Override
+ protected double[] preAggProductRows() {
+ return new double[0];
+ }
+
+ @Override
+ protected double[] preAggBuiltinRows(Builtin builtin) {
+ return new double[0];
+ }
+
+ @Override
+ public boolean sameIndexStructure(AColGroupCompressed that) {
+ return false;
+ }
+
+ @Override
+ protected void tsmm(double[] result, int numColumns, int nRows) {
+
+ }
+
+ @Override
+ public AColGroup copyAndSet(IColIndex colIndexes) {
+ return null;
+ }
+
+ @Override
+ public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int
ru) {
+
+ }
+
+ @Override
+ public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int
nColOut) {
+
+ }
+
+ @Override
+ public double getIdx(int r, int colIdx) {
+ // ✅ CRUCIAL: Bounds-Check für colIdx!
Review Comment:
Avoid emojis;
Also, they are usually a hint of LLM generated code (which is strictly
forbidden for your submissions)
##########
src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java:
##########
Review Comment:
Please revert
##########
src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java:
##########
@@ -19,8 +19,10 @@
package org.apache.sysds.test.component.compress.colgroup;
+import static
org.apache.sysds.runtime.compress.colgroup.ColGroupFactory.computeSegmentCost;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
Review Comment:
Remove jupiter assertions, that will cause the build to fail as we don't use
jupiter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]