janniklinde commented on code in PR #2420:
URL: https://github.com/apache/systemds/pull/2420#discussion_r2745520119


##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ColGroupPiecewiseLinearCompressed.java:
##########


Review Comment:
   Move this class from `colgroup/scheme` package to `colgroup/`.
   In general, all methods that are currently unimplemented should `throw new 
NotImplementedException()`



##########
bin/systemds-standalone.sh:
##########


Review Comment:
   This file should not be part of the PR. You can keep it locally but you 
should untrack it and not add it to your commits. You could use `git rm 
--cached bin/systemds-standalone.sh`. 



##########
src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java:
##########
@@ -135,6 +136,21 @@ public class CompressionSettings {
 
        public final boolean preferDeltaEncoding;
 
+       /**
+        * Ziel-Gesantverlust für piecewise Lineace Komocession• 
Interpretation: maximal entaubter Alobaler MSE pro Went in
+        * der Sealte. O.O ~ quasi verlustfrei, viele Segmente >0 ~ mehr 
Approximation entaubt, weniger Segmente
+        */

Review Comment:
   Weird comment



##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java:
##########


Review Comment:
   It seems like you reformatted the file to revert the tabs -> spaces 
conversion, which is good. However, there are still many unnecessary changes. I 
would recommend you revert that file to the original state of this repository 
and then only add the `enum CompressionType PiecewiseLinear`



##########
pom.xml:
##########


Review Comment:
   Please revert that file. You may keep your changes locally but these changes 
should not be pushed



##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java:
##########
@@ -1066,6 +1074,180 @@ private static AColGroup 
compressLinearFunctional(IColIndex colIndexes, MatrixBl
                return ColGroupLinearFunctional.create(colIndexes, 
coefficients, numRows);
        }
 
+       public static AColGroup compressPiecewiseLinearFunctional(IColIndex 
colIndexes, MatrixBlock in,
+               CompressionSettings cs) {
+
+               //Erstmal den Inhalt einer Spalte speichern
+
+               int numRows = in.getNumRows();
+               int colIdx = colIndexes.get(0); //Die erste Spalte

Review Comment:
   You take the first column, which is fine for now, but in a finished 
implementation you would either repeat compression on every column or do a 
multidimensional regression, where you treat a 'row' of all indices as a vector.



##########
src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java:
##########


Review Comment:
   Here please revert the file. Did you change anything in this file (except 
tabs->spaces which you should be reverted)?
   
   You might consider creating a variable `double targetLoss` and a method 
`public CompressionSettingsBuilder setTargetLoss(double loss) {...}`. If you 
then add the `targetLoss` as a variable in the `CompressionSettings` 
constructor, you directly set the target loss via `CompressionSettingsBuilder`



##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java:
##########
@@ -1066,6 +1074,180 @@ private static AColGroup 
compressLinearFunctional(IColIndex colIndexes, MatrixBl
                return ColGroupLinearFunctional.create(colIndexes, 
coefficients, numRows);
        }
 
+       public static AColGroup compressPiecewiseLinearFunctional(IColIndex 
colIndexes, MatrixBlock in,
+               CompressionSettings cs) {
+
+               //Erstmal den Inhalt einer Spalte speichern
+
+               int numRows = in.getNumRows();
+               int colIdx = colIndexes.get(0); //Die erste Spalte
+               double[] column = getColumn(in, colIdx);
+
+               //Sette den Targetloss
+
+               // Breakpoints bestimmen: Einteilung der Segmente
+
+               List<Integer> breakpointsList = computeBreakpoints(cs, column);
+               int[] breakpoints = 
breakpointsList.stream().mapToInt(Integer::intValue).toArray();
+               //Für jedes Segment lineare Regression als kompressionsverfahren
+
+               // 3) Pro Segment Regression -> a,b
+               int numSeg = breakpoints.length - 1;
+               double[] slopes = new double[numSeg];
+               double[] intercepts = new double[numSeg];
+
+               for(int s = 0; s < numSeg; s++) {
+                       int start = breakpoints[s];
+                       int end = breakpoints[s + 1];
+
+                       double[] ab = regressSegment(column, start, end); // 
nutzt gleiche Stats wie computeSegmentCost
+                       slopes[s] = ab[0];
+                       intercepts[s] = ab[1];
+               }
+               //Erstelle die Datenstruktur: PiecewiseLinearColGroupCompressed
+
+               return ColGroupPiecewiseLinearCompressed.create(colIndexes, 
breakpoints, slopes, intercepts, numRows);
+       }
+
+       public static double[] getColumn(MatrixBlock in, int colIndex) {
+               int numRows = in.getNumRows();          // Anzahl der Zeilen 
[web:16]
+               double[] column = new double[numRows];  // Variable für die 
Spalte
+
+               for(int r = 0; r < numRows; r++) {
+                       column[r] = in.get(r, colIndex);  // Wert (r, colIndex) 
lesen [web:16][web:25]
+               }
+               return column;
+       }
+
+       public static List<Integer> computeBreakpoints(CompressionSettings cs, 
double[] column) {
+               int n = column.length;
+               double targetMSE = cs.getPiecewiseTargetLoss();
+               // Fall A: kein TargetLoss angegeben -> einfache Variante mit 
fixem λ
+               if(Double.isNaN(targetMSE) || targetMSE <= 0) {
+                       double lambda = 5.0;
+                       return computeBreakpointsLambda(column, lambda);
+               }
+
+               // Fall B: TargetLoss gesetzt -> globales Fehlerbudget 
berücksichtigen
+               double sseMax = n * targetMSE; // MSE -> SSE-Budget
+
+               double lambdaMin = 0.0;   // viele Segmente, minimaler Fehler
+               double lambdaMax = 1e6;   // wenige Segmente, mehr Fehler
+
+               List<Integer> bestBreaks = null;
+
+               for(int it = 0; it < 20; it++) { // Binärsuche auf λ
+                       double lambda = 0.5 * (lambdaMin + lambdaMax);
+
+                       List<Integer> breaks = computeBreakpointsLambda(column, 
lambda);
+                       double totalSSE = computeTotalSSE(column, breaks);
+
+                       if(totalSSE <= sseMax) {
+                               // Budget eingehalten: wir können versuchen, 
mit größerem λ noch weniger Segmente zu nehmen
+                               bestBreaks = breaks;
+                               lambdaMin = lambda;
+                       }
+                       else {
+                               // Fehler zu groß: λ verkleinern, mehr Segmente 
zulassen
+                               lambdaMax = lambda;
+                       }
+               }
+
+               if(bestBreaks == null)
+                       bestBreaks = computeBreakpointsLambda(column, 
lambdaMin);
+
+               return bestBreaks;
+       }
+
+       public static List<Integer> computeBreakpointsLambda(double[] column, 
double lambda) {
+               int sizeColumn = column.length;
+               double[] dp = new double[sizeColumn + 1];
+               int[] prev = new int[sizeColumn + 1];
+
+               dp[0] = 0.0;
+
+               for(int index = 1; index <= sizeColumn; index++) {
+                       dp[index] = Double.POSITIVE_INFINITY;
+                       for(int i = 0; i < index; i++) { // Segment [i, index)
+                               double costCurrentSegment = 
computeSegmentCost(column, i, index); // SSE
+                               double candidateCost = dp[i] + 
costCurrentSegment + lambda;
+                               if(candidateCost < dp[index]) {
+                                       dp[index] = candidateCost;
+                                       prev[index] = i;
+                               }
+                       }
+               }
+
+               List<Integer> segmentLimits = new ArrayList<>();
+               int breakpointIndex = sizeColumn;
+               while(breakpointIndex > 0) {
+                       segmentLimits.add(breakpointIndex);
+                       breakpointIndex = prev[breakpointIndex];
+               }
+               segmentLimits.add(0);
+               Collections.sort(segmentLimits);
+               return segmentLimits;
+       }
+
+       public static double computeSegmentCost(double[] column, int start, int 
end) {
+               int n = end - start;
+               if(n <= 1)
+                       return 0.0;
+
+               double[] ab = regressSegment(column, start, end);
+               double slope = ab[0];
+               double intercept = ab[1];
+
+               double sse = 0.0;
+               for(int i = start; i < end; i++) {
+                       double x = i;
+                       double y = column[i];
+                       double yhat = slope * x + intercept;
+                       double diff = y - yhat;
+                       sse += diff * diff;
+               }
+               return sse; // oder sse / n als MSE
+       }
+
+       public static double computeTotalSSE(double[] column, List<Integer> 
breaks) {
+               double total = 0.0;
+               for(int s = 0; s < breaks.size() - 1; s++) {
+                       int start = breaks.get(s);
+                       int end = breaks.get(s + 1);
+                       total += computeSegmentCost(column, start, end); // SSE 
des Segments
+               }
+               return total;
+       }
+
+       public static double[] regressSegment(double[] column, int start, int 
end) {
+               int n = end - start;
+               if(n <= 0)
+                       return new double[] {0.0, 0.0};
+
+               double sumX = 0, sumY = 0, sumXX = 0, sumXY = 0;
+               for(int i = start; i < end; i++) {
+                       double x = i;
+                       double y = column[i];
+                       sumX += x;
+                       sumY += y;
+                       sumXX += x * x;
+                       sumXY += x * y;
+               }
+
+               double nD = n;
+               double denom = nD * sumXX - sumX * sumX;
+               double slope, intercept;
+               if(denom == 0) {
+                       slope = 0.0;
+                       intercept = sumY / nD;
+               }
+               else {
+                       slope = (nD * sumXY - sumX * sumY) / denom;
+                       intercept = (sumY - slope * sumX) / nD;
+               }
+               return new double[] {slope, intercept};
+       }
+

Review Comment:
   To keep this file clean, I recommend that you create a new class called 
`PiecewiseLinearUtils` in the package `functional`. Your 
`compressPiecewiseLinearFunctional(...)` then just calls 
`PiecewiseLinearUtils.compressSegmentedLeastSquares(...)`.



##########
use-java17-systemds.sh:
##########


Review Comment:
   Please remove that file from the PR.



##########
src/test/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java:
##########


Review Comment:
   There should be no underscores in method names.
   
   Move this test file to `test/component/compress/colgroup`.
   
   You have a lot of isolated tests (which also look like autogenerated tests 
and not handwritten). It would be nice to have more tests. Please remove some 
redundant ones, and add tests on randomly generated data (with a fixed seed) 
where you create a `ColGroupPiecewiseLinearCompressed` and then 
`decompressToDenseBlock`. You then compare it to the original data and compute 
a loss (which should be no more than some upper bound).



##########
src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ColGroupPiecewiseLinearCompressed.java:
##########
@@ -0,0 +1,392 @@
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.AColGroupCompressed;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.instructions.cp.CmCovObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.matrix.operators.CMOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+
+import java.util.Arrays;
+
+public class ColGroupPiecewiseLinearCompressed extends AColGroupCompressed {
+
+       IColIndex colIndexes;
+       int[] breakpoints;
+       double[] slopes;
+       double[] intercepts;
+       int numRows;
+
+       protected ColGroupPiecewiseLinearCompressed(IColIndex colIndices) {
+               super(colIndices);
+       }
+
+       public ColGroupPiecewiseLinearCompressed(IColIndex colIndexes, int[] 
breakpoints, double[] slopes,
+               double[] intercepts, int numRows) {
+               super(colIndexes);
+               this.breakpoints = breakpoints;
+               this.slopes = slopes;
+               this.intercepts = intercepts;
+               this.numRows = numRows;
+       }
+
+       public static AColGroup create(IColIndex colIndexes, int[] breakpoints, 
double[] slopes, double[] intercepts,
+               int numRows) {
+               if(breakpoints == null || breakpoints.length < 2)
+                       throw new IllegalArgumentException("Need at least one 
segment");
+
+               int numSeg = breakpoints.length - 1;
+               if(slopes.length != numSeg || intercepts.length != numSeg)
+                       throw new IllegalArgumentException("Inconsistent 
segment arrays");
+
+               int[] bpCopy = Arrays.copyOf(breakpoints, breakpoints.length);
+               double[] slopeCopy = Arrays.copyOf(slopes, slopes.length);
+               double[] interceptCopy = Arrays.copyOf(intercepts, 
intercepts.length);
+
+               return new ColGroupPiecewiseLinearCompressed(colIndexes, 
bpCopy, slopeCopy, interceptCopy, numRows);
+
+       }
+
+       @Override
+       public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int 
offR, int offC) {
+
+               if(db == null || _colIndexes == null || _colIndexes.size() == 0 
|| breakpoints == null || slopes == null ||
+                       intercepts == null) {
+                       return;
+               }
+
+               int numSeg = breakpoints.length - 1;
+               if(numSeg <= 0 || rl >= ru) {
+                       return;
+               }
+
+               final int col = _colIndexes.get(0);
+
+               for(int s = 0; s < numSeg; s++) {
+                       int segStart = breakpoints[s];
+                       int segEnd = breakpoints[s + 1];
+                       if(segStart >= segEnd)
+                               continue;  // Invalid Segment
+
+                       double a = slopes[s];
+                       double b = intercepts[s];
+
+                       int rs = Math.max(segStart, rl);
+                       int re = Math.min(segEnd, ru);
+                       if(rs >= re)
+                               continue;
+
+                       for(int r = rs; r < re; r++) {
+                               double yhat = a * r + b;
+                               int gr = offR + r;
+                               int gc = offC + col;
+
+                               if(gr >= 0 && gr < db.numRows() && gc >= 0 && 
gc < db.numCols()) {
+                                       db.set(gr, gc, yhat);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       protected double computeMxx(double c, Builtin builtin) {
+               return 0;
+       }
+
+       @Override
+       protected void computeColMxx(double[] c, Builtin builtin) {
+
+       }
+
+       @Override
+       protected void computeSum(double[] c, int nRows) {
+
+       }
+
+       @Override
+       protected void computeSumSq(double[] c, int nRows) {
+
+       }
+
+       @Override
+       protected void computeColSumsSq(double[] c, int nRows) {
+
+       }
+
+       @Override
+       protected void computeRowSums(double[] c, int rl, int ru, double[] 
preAgg) {
+
+       }
+
+       @Override
+       protected void computeRowMxx(double[] c, Builtin builtin, int rl, int 
ru, double[] preAgg) {
+
+       }
+
+       @Override
+       protected void computeProduct(double[] c, int nRows) {
+
+       }
+
+       @Override
+       protected void computeRowProduct(double[] c, int rl, int ru, double[] 
preAgg) {
+
+       }
+
+       @Override
+       protected void computeColProduct(double[] c, int nRows) {
+
+       }
+
+       @Override
+       protected double[] preAggSumRows() {
+               return new double[0];
+       }
+
+       @Override
+       protected double[] preAggSumSqRows() {
+               return new double[0];
+       }
+
+       @Override
+       protected double[] preAggProductRows() {
+               return new double[0];
+       }
+
+       @Override
+       protected double[] preAggBuiltinRows(Builtin builtin) {
+               return new double[0];
+       }
+
+       @Override
+       public boolean sameIndexStructure(AColGroupCompressed that) {
+               return false;
+       }
+
+       @Override
+       protected void tsmm(double[] result, int numColumns, int nRows) {
+
+       }
+
+       @Override
+       public AColGroup copyAndSet(IColIndex colIndexes) {
+               return null;
+       }
+
+       @Override
+       public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int 
ru) {
+
+       }
+
+       @Override
+       public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int 
nColOut) {
+
+       }
+
+       @Override
+       public double getIdx(int r, int colIdx) {
+               // ✅ CRUCIAL: Bounds-Check für colIdx!

Review Comment:
   Avoid emojis; 
   Also, they are usually a hint of LLM generated code (which is strictly 
forbidden for your submissions)



##########
src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java:
##########


Review Comment:
   Please revert



##########
src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java:
##########
@@ -19,8 +19,10 @@
 
 package org.apache.sysds.test.component.compress.colgroup;
 
+import static 
org.apache.sysds.runtime.compress.colgroup.ColGroupFactory.computeSegmentCost;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;

Review Comment:
   Remove jupiter assertions, that will cause the build to fail as we don't use 
jupiter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to