This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new aefed8f945 [MINOR] LibMatrixAgg sum operator without KAHAN
aefed8f945 is described below
commit aefed8f9456d4da52f67849f2056d3f614678ecd
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Fri Jan 5 13:01:45 2024 +0100
[MINOR] LibMatrixAgg sum operator without KAHAN
---
.../sysds/runtime/matrix/data/LibMatrixAgg.java | 205 +++++++++++++++++----
1 file changed, 165 insertions(+), 40 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
index 70ee962162..0891d7f1ae 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types.CorrectionLocationType;
@@ -51,6 +52,7 @@ import org.apache.sysds.runtime.functionobjects.KahanPlus;
import org.apache.sysds.runtime.functionobjects.KahanPlusSq;
import org.apache.sysds.runtime.functionobjects.Mean;
import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.functionobjects.ReduceAll;
import org.apache.sysds.runtime.functionobjects.ReduceCol;
import org.apache.sysds.runtime.functionobjects.ReduceDiag;
@@ -106,6 +108,8 @@ public class LibMatrixAgg {
private enum AggType {
KAHAN_SUM,
KAHAN_SUM_SQ,
+ SUM,
+ SUM_SQ,
CUM_KAHAN_SUM,
CUM_MIN,
CUM_MAX,
@@ -686,10 +690,12 @@ public class LibMatrixAgg {
return AggType.KAHAN_SUM_SQ;
}
+ final boolean rAll_rCol_rRow = ifn instanceof ReduceAll || ifn
instanceof ReduceCol || ifn instanceof ReduceRow;
+
//mean
if( vfn instanceof Mean
&& (op.aggOp.correction ==
CorrectionLocationType.LASTTWOCOLUMNS || op.aggOp.correction ==
CorrectionLocationType.LASTTWOROWS)
- && (ifn instanceof ReduceAll || ifn instanceof
ReduceCol || ifn instanceof ReduceRow) )
+ && rAll_rCol_rRow )
{
return AggType.MEAN;
}
@@ -699,22 +705,20 @@ public class LibMatrixAgg {
&& ((CM) vfn).getAggOpType() ==
AggregateOperationTypes.VARIANCE
&& (op.aggOp.correction ==
CorrectionLocationType.LASTFOURCOLUMNS ||
op.aggOp.correction ==
CorrectionLocationType.LASTFOURROWS)
- && (ifn instanceof ReduceAll || ifn instanceof
ReduceCol || ifn instanceof ReduceRow) )
+ && rAll_rCol_rRow )
{
return AggType.VAR;
}
//prod
- if( vfn instanceof Multiply
- && (ifn instanceof ReduceAll || ifn instanceof
ReduceCol || ifn instanceof ReduceRow))
- {
+ if(vfn instanceof Multiply && rAll_rCol_rRow)
return AggType.PROD;
- }
- //min / max
- if( vfn instanceof Builtin &&
- (ifn instanceof ReduceAll || ifn instanceof ReduceCol ||
ifn instanceof ReduceRow) )
- {
+ if(vfn instanceof Plus && rAll_rCol_rRow)
+ return AggType.SUM;
+
+ // min / max
+ if(vfn instanceof Builtin && rAll_rCol_rRow) {
BuiltinCode bfcode = ((Builtin)vfn).bFunc;
switch( bfcode ){
case MAX: return AggType.MAX;
@@ -1470,6 +1474,19 @@ public class LibMatrixAgg {
d_uakptrace(a, c, n, kbuff,
(KahanPlus)vFn, rl, ru);
break;
}
+ case SUM:{
+ if(a instanceof DenseBlockFP64DEDUP)
+ throw new NotImplementedException();
+ else if(ixFn instanceof ReduceAll) // SUM
+ d_uap(a, c, n, rl, ru);
+ else if(ixFn instanceof ReduceCol) // ROWSUM
+ d_uarp(a, c, n, rl, ru);
+ else if(ixFn instanceof ReduceRow) // COLSUM
+ d_uacp(a, c, n, rl, ru);
+ else if(ixFn instanceof ReduceDiag) // TRACE
+ throw new NotImplementedException();
+ break;
+ }
case KAHAN_SUM_SQ: { //SUM_SQ via k+,
KahanObject kbuff = new KahanObject(0, 0);
if( ixFn instanceof ReduceAll ) //SUM_SQ
@@ -1577,6 +1594,17 @@ public class LibMatrixAgg {
s_uakptrace(a, c, n, kbuff,
(KahanPlus)vFn, rl, ru);
break;
}
+ case SUM:{
+ if( ixFn instanceof ReduceAll ) // SUM
+ s_uap(a, c, n, rl, ru);
+ else if( ixFn instanceof ReduceCol ) //ROWSUM
+ s_uarp(a, c, n, rl, ru);
+ else if( ixFn instanceof ReduceRow ) //COLSUM
+ s_uacp(a, c, n, rl, ru);
+ else if( ixFn instanceof ReduceDiag ) //TRACE
+ throw new NotImplementedException();
+ break;
+ }
case KAHAN_SUM_SQ: { //SUM_SQ via k+
KahanObject kbuff = new KahanObject(0, 0);
if( ixFn instanceof ReduceAll ) //SUM_SQ
@@ -1666,8 +1694,6 @@ public class LibMatrixAgg {
DenseBlock da = in.getDenseBlock();
DenseBlock dc = out.getDenseBlock();
- double[] a = in.getDenseBlockValues();
- double[] c = out.getDenseBlockValues();
switch( optype ) {
case CUM_KAHAN_SUM: { //CUMSUM
@@ -1683,11 +1709,19 @@ public class LibMatrixAgg {
break;
}
case CUM_PROD: { //CUMPROD
+ if(!da.isContiguous())
+ throw new NotImplementedException("Not
implemented large block Cum Prod : " + optype);
+ double[] a = in.getDenseBlockValues();
+ double[] c = out.getDenseBlockValues();
d_ucumm(a, agg, c, n, rl, ru);
break;
}
case CUM_MIN:
case CUM_MAX: {
+ if(!da.isContiguous())
+ throw new NotImplementedException("Not
implemented large block Cum min or max " + optype);
+ double[] a = in.getDenseBlockValues();
+ double[] c = out.getDenseBlockValues();
double init = (optype==AggType.CUM_MAX)?
Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY;
d_ucummxx(a, agg, c, n, init, (Builtin)vFn, rl,
ru);
break;
@@ -1739,6 +1773,8 @@ public class LibMatrixAgg {
double val = Double.NaN;
switch( optype ) {
case PROD: val = 1; break;
+ case SUM:
+ case SUM_SQ:
case KAHAN_SUM:
case KAHAN_SUM_SQ: val = 0; break;
case MIN: val =
Double.POSITIVE_INFINITY; break;
@@ -1754,8 +1790,9 @@ public class LibMatrixAgg {
}
//handle pseudo sparse-safe operations over empty inputs
- if(optype==AggType.KAHAN_SUM || optype==AggType.KAHAN_SUM_SQ
- || optype==AggType.MIN || optype==AggType.MAX
|| optype==AggType.PROD
+ if(optype == AggType.KAHAN_SUM || optype == AggType.KAHAN_SUM_SQ
+ || optype == AggType.SUM || optype ==
AggType.SUM_SQ
+ || optype == AggType.MIN || optype ==
AggType.MAX || optype == AggType.PROD
|| optype == AggType.CUM_KAHAN_SUM || optype ==
AggType.CUM_PROD
|| optype == AggType.CUM_MIN || optype ==
AggType.CUM_MAX)
{
@@ -1821,11 +1858,11 @@ public class LibMatrixAgg {
/**
* SUM, opcode: uak+, dense input.
*
- * @param a ?
- * @param c ?
- * @param n ?
- * @param kbuff ?
- * @param kplus ?
+ * @param a Input block
+ * @param c Output block
+ * @param n Input block number of columns
+ * @param kbuff Kahn addition buffer
+ * @param kplus Kahn plus operator
* @param rl row lower index
* @param ru row upper index
*/
@@ -1843,15 +1880,29 @@ public class LibMatrixAgg {
}
c.set(kbuff);
}
+
+ private static void d_uap(DenseBlock a, DenseBlock c, int n, int rl,
int ru) {
+ final int bil = a.index(rl);
+ final int biu = a.index(ru - 1);
+ double runningSum = 0.0;
+ for(int bi = bil; bi <= biu; bi++) { // for each block
+ final int lpos = (bi == bil) ? a.pos(rl) : 0;
+ final int len = (bi == biu) ? a.pos(ru - 1) - lpos + n
: a.blockSize(bi) * n;
+ final double[] aVals = a.valuesAt(bi); // get all the
values
+ for(int i = lpos; i < lpos + len; i++) // all values in
the block
+ runningSum += aVals[i];
+ }
+ c.set(runningSum);
+ }
/**
* ROWSUM, opcode: uark+, dense input.
*
- * @param a ?
- * @param c ?
- * @param n ?
- * @param kbuff ?
- * @param kplus ?
+ * @param a Input matrix to rowSum
+ * @param c Output matrix to set the row sums into
+ * @param n The number of columns in the output
+ * @param kbuff kahn buffer
+ * @param kplus Kahn plus operator
* @param rl row lower index
* @param ru row upper index
*/
@@ -1867,17 +1918,28 @@ public class LibMatrixAgg {
}
}
}
-
+
+ private static void d_uarp(DenseBlock a, DenseBlock c, int n, int rl,
int ru) {
+ for(int i = rl; i < ru; i++) {
+ final int off = a.pos(i);
+ final double[] aVals = a.values(i);
+ double tmp = 0.0;
+ for(int col = off; col< off + n; col++)
+ tmp += aVals[col];
+ c.set(i, 0, tmp);
+ }
+ }
+
/**
* COLSUM, opcode: uack+, dense input.
*
- * @param a ?
- * @param c ?
- * @param n ?
- * @param kbuff ?
- * @param kplus ?
- * @param rl row lower index
- * @param ru row upper index
+ * @param a Input block
+ * @param c Output block
+ * @param n number of column in the input
+ * @param kbuff Kahn buffer
+ * @param kplus Kahn plus operator
+ * @param rl row lower index
+ * @param ru row upper index
*/
private static void d_uackp( DenseBlock a, DenseBlock c, int n,
KahanObject kbuff, KahanPlus kplus, int rl, int ru ) {
if(a instanceof DenseBlockFP64DEDUP)
@@ -1888,6 +1950,18 @@ public class LibMatrixAgg {
}
}
+ private static void d_uacp( DenseBlock a, DenseBlock c, int n, int rl,
int ru ) {
+ // Output always a vector.
+ double[] cVals = c.values(0);
+ for(int r = rl; r < ru; r++){
+ int apos = a.pos(r);
+ double[] avals = a.values(r);
+ for(int i = 0; i < n; i++){
+ cVals[i] += avals[apos + i];
+ }
+ }
+ }
+
/**
* SUM_SQ, opcode: uasqk+, dense input.
*
@@ -2407,7 +2481,7 @@ public class LibMatrixAgg {
/**
* SUM, opcode: uak+, sparse input.
*
- * @param a dense input block
+ * @param a Sparse input block
* @param c dense output block
* @param n number columns in input
* @param kbuff Kahn buffer
@@ -2429,16 +2503,33 @@ public class LibMatrixAgg {
c.set(kbuff);
}
+
+ private static void s_uap(SparseBlock a, DenseBlock c, int n, int rl,
int ru) {
+ double tmp = 0.0;
+ if(a.isContiguous()) {
+ final double[] aVal = a.values(rl);
+ final int s = a.pos(rl);
+ final long e = a.size(rl, ru);
+ for(int i = s; i < e; i++)
+ tmp += aVal[i];
+ }
+ else {
+ for(int i = rl; i < ru; i++)
+ tmp += s_sumRow(a, i);
+ }
+ c.set(tmp);
+ }
+
/**
* ROWSUM, opcode: uark+, sparse input.
*
- * @param a ?
- * @param c ?
- * @param n ?
- * @param kbuff ?
- * @param kplus ?
- * @param rl row lower index
- * @param ru row upper index
+ * @param a Spasrse block to row sum on
+ * @param c Dense output block
+ * @param n Number of column in the input block
+ * @param kbuff Kahan buffer
+ * @param kplus Kahan plus operator
+ * @param rl Row lower index
+ * @param ru Row upper index
*/
private static void s_uarkp( SparseBlock a, DenseBlock c, int n,
KahanObject kbuff, KahanPlus kplus, int rl, int ru ) {
//compute row aggregates
@@ -2449,6 +2540,25 @@ public class LibMatrixAgg {
c.set(i, kbuff);
}
}
+
+ private static void s_uarp(SparseBlock a, DenseBlock c, int n, int rl,
int ru) {
+ // compute row aggregates
+ for(int i = rl; i < ru; i++)
+ c.set(i, 0, s_sumRow(a, i));
+ }
+
+ private static double s_sumRow(SparseBlock a, int r) {
+ if(a.isEmpty(r))
+ return 0.0;
+ double tmp = 0.0;
+ final double[] aVal = a.values(r);
+ final int aPos = a.pos(r);
+ final int aEnd = aPos + a.size(r);
+ for(int j = aPos; j < aEnd; j++)
+ tmp += aVal[j];
+ return tmp;
+ }
+
/**
* COLSUM, opcode: uack+, sparse input.
@@ -2475,6 +2585,16 @@ public class LibMatrixAgg {
}
}
+ private static void s_uacp(SparseBlock a, DenseBlock c, int n, int rl,
int ru) {
+ final double[] cVal = c.values(0);
+ if(a.isContiguous())
+ sumAgg(a.values(rl), cVal, a.indexes(rl), a.pos(rl),
(int) a.size(rl, ru), n);
+ else
+ for(int i = rl; i < ru; i++)
+ if(!a.isEmpty(i))
+ sumAgg(a.values(i), cVal, a.indexes(i),
a.pos(i), a.size(i), n);
+ }
+
/**
* SUM_SQ, opcode: uasqk+, sparse input.
*
@@ -3248,6 +3368,11 @@ public class LibMatrixAgg {
corr[pos1+ix] = kbuff._correction;
}
}
+
+ private static void sumAgg(double[] a, double[] c, int[] aix, int ai,
final int len, final int n) {
+ for(int i = ai; i < ai + len; i++)
+ c[aix[i]] += a[i];
+ }
private static double product( double[] a, int ai, final int len ) {
double val = 1;