Repository: incubator-systemml Updated Branches: refs/heads/master b78c12593 -> bbefe96b2
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/codegen/LibSpoofPrimitives.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/LibSpoofPrimitives.java b/src/main/java/org/apache/sysml/runtime/codegen/LibSpoofPrimitives.java new file mode 100644 index 0000000..c73456d --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/codegen/LibSpoofPrimitives.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.codegen; + +import org.apache.sysml.runtime.matrix.data.LibMatrixMult; + +/** + * This library contains all vector primitives that are used in + * generated source code for fused operators. For primitives that + * exist in LibMatrixMult, these calls are simply forwarded to + * ensure consistency in performance and result correctness. + * + */ +public class LibSpoofPrimitives +{ + // forwarded calls to LibMatrixMult + + public static double dotProduct( double[] a, double[] b, int ai, int bi, final int len ) { + return LibMatrixMult.dotProduct(a, b, ai, bi, len); + } + + public static double dotProduct( double[] a, double[] b, int[] aix, int ai, final int bi, final int len ) { + return LibMatrixMult.dotProduct(a, b, aix, ai, bi, len); + } + + public static void vectMultiplyAdd( final double aval, double[] b, double[] c, int bi, int ci, final int len ) { + LibMatrixMult.vectMultiplyAdd(aval, b, c, bi, ci, len); + } + + public static void vectMultiplyAdd( final double aval, double[] b, double[] c, int[] bix, final int bi, final int ci, final int len ) { + LibMatrixMult.vectMultiplyAdd(aval, b, c, bix, bi, ci, len); + } + + public static void vectMultiplyWrite( final double aval, double[] b, double[] c, int bi, int ci, final int len ) { + LibMatrixMult.vectMultiplyWrite(aval, b, c, bi, ci, len); + } + + // custom methods + + /** + * Computes c = sum(A), where A is a dense vectors. + * + * @param a dense input vector A + * @param ai start position in A + * @param len number of processed elements + * @return sum value + */ + public static double vectSum( double[] a, int ai, final int len ) { + double val = 0; + final int bn = len%8; + + //compute rest + for( int i = 0; i < bn; i++, ai++ ) + val += a[ ai ]; + + //unrolled 8-block (for better instruction-level parallelism) + for( int i = bn; i < len; i+=8, ai+=8 ) { + //read 64B cacheline of a, compute cval' = sum(a) + cval + val += a[ ai+0 ] + a[ ai+1 ] + a[ ai+2 ] + a[ ai+3 ] + + a[ ai+4 ] + a[ ai+5 ] + a[ ai+6 ] + a[ ai+7 ]; + } + + //scalar result + return val; + } + + /** + * Computes c = sum(A), where A is a sparse vector. + * + * @param avals sparse input vector A values A + * @param aix sparse input vector A column indexes + * @param ai start position in A + * @param len number of processed elements + * @return sum value + */ + public static double vectSum( double[] avals, int[] aix, int ai, int len) { + double val = 0; + final int bn = len%8; + + //compute rest + for( int i = ai; i < ai+bn; i++ ) + val += avals[ ai+aix[i] ]; + + //unrolled 8-block (for better instruction-level parallelism) + for( int i = ai+bn; i < ai+len; i+=8 ) + { + //read 64B of a via 'gather' + //compute cval' = sum(a) + cval + val += avals[ ai+aix[i+0] ] + avals[ ai+aix[i+1] ] + + avals[ ai+aix[i+2] ] + avals[ ai+aix[i+3] ] + + avals[ ai+aix[i+4] ] + avals[ ai+aix[i+5] ] + + avals[ ai+aix[i+6] ] + avals[ ai+aix[i+7] ]; + } + + //scalar result + return val; + } + + /** + * Computes C += A / b, where C and A are dense vectors and b is a scalar. + * + * @param a dense input vector A + * @param bval input scalar b + * @param c dense input-output vector C + * @param ai start position in A + * @param ci start position in C + * @param len number of processed elements. + */ + public static void vectDivAdd( double[] a, final double bval, double[] c, int ai, int ci, final int len ) + { + final int bn = len%8; + + //rest, not aligned to 8-blocks + for( int j = 0; j < bn; j++, ai++, ci++) + c[ ci ] += a[ ai ] / bval; + + //unrolled 8-block (for better instruction-level parallelism) + for( int j = bn; j < len; j+=8, ai+=8, ci+=8) + { + //read 64B cachelines of b and c + //compute c' = aval * b + c + //write back 64B cacheline of c = c' + c[ ci+0 ] += a[ ai+0 ] / bval; + c[ ci+1 ] += a[ ai+1 ] / bval; + c[ ci+2 ] += a[ ai+2 ] / bval; + c[ ci+3 ] += a[ ai+3 ] / bval; + c[ ci+4 ] += a[ ai+4 ] / bval; + c[ ci+5 ] += a[ ai+5 ] / bval; + c[ ci+6 ] += a[ ai+6 ] / bval; + c[ ci+7 ] += a[ ai+7 ] / bval; + } + } + + /** + * Computes C += A / b, where C is a dense vector, A is a sparse vector, and b is a scalar. + * + * @param a sparse input vector A values + * @param bval input scalar b + * @param c dense input-output vector C + * @param aix sparse input vector A column indexes + * @param ai start position in A + * @param ci start position in C + * @param len number of processed elements. + */ + public static void vectDivAdd( double[] a, final double bval, double[] c, int[] aix, final int ai, final int ci, final int len ) + { + final int bn = len%8; + + //rest, not aligned to 8-blocks + for( int j = ai; j < ai+bn; j++ ) + c[ ci + aix[j] ] += a[ j ] / bval; + + //unrolled 8-block (for better instruction-level parallelism) + for( int j = ai+bn; j < ai+len; j+=8 ) + { + //read 64B cacheline of b + //read 64B of c via 'gather' + //compute c' = aval * b + c + //write back 64B of c = c' via 'scatter' + c[ ci+aix[j+0] ] += a[ j+0 ] / bval; + c[ ci+aix[j+1] ] += a[ j+1 ] / bval; + c[ ci+aix[j+2] ] += a[ j+2 ] / bval; + c[ ci+aix[j+3] ] += a[ j+3 ] / bval; + c[ ci+aix[j+4] ] += a[ j+4 ] / bval; + c[ ci+aix[j+5] ] += a[ j+5 ] / bval; + c[ ci+aix[j+6] ] += a[ j+6 ] / bval; + c[ ci+aix[j+7] ] += a[ j+7 ] / bval; + } + } + + /** + * Computes C = A / b, where C and A are dense vectors, and b is a scalar. + * + * @param a dense input vector A + * @param bval input scalar b + * @param c dense input-output vector C + * @param ai start position in A + * @param ci start position in C + * @param len number of processed elements. + */ + public static void vectDivWrite( double[] a, final double bval, double[] c, int ai, int ci, final int len ) + { + final int bn = len%8; + + //rest, not aligned to 8-blocks + for( int j = 0; j < bn; j++, ai++, ci++) + c[ ci ] = a[ ai ] / bval; + + //unrolled 8-block (for better instruction-level parallelism) + for( int j = bn; j < len; j+=8, ai+=8, ci+=8) + { + //read 64B cachelines of a and c + //compute c' = a / bval + c + //write back 64B cacheline of c = c' + c[ ci+0 ] = a[ ai+0 ] / bval; + c[ ci+1 ] = a[ ai+1 ] / bval; + c[ ci+2 ] = a[ ai+2 ] / bval; + c[ ci+3 ] = a[ ai+3 ] / bval; + c[ ci+4 ] = a[ ai+4 ] / bval; + c[ ci+5 ] = a[ ai+5 ] / bval; + c[ ci+6 ] = a[ ai+6 ] / bval; + c[ ci+7 ] = a[ ai+7 ] / bval; + } + } + + /** + * Computes C = A / b, where C is a dense vector and A is a sparse vector, and b is a scalar. + * + * @param a sparse input vector A values + * @param aix sparse input vector A column indexes + * @param bval input scalar b + * @param c dense input-output vector C + * @param ai start position in A + * @param ci start position in C + * @param len number of processed elements. + */ + public static void vectDivWrite( double[] a, int[] aix, final double bval, double[] c, final int ai, final int ci, final int len ) + { + final int bn = len%8; + + //rest, not aligned to 8-blocks + for( int j = ai; j < ai+bn; j++ ) + c[ ci + aix[j] ] += a[ j ] / bval; + + //unrolled 8-block (for better instruction-level parallelism) + for( int j = ai+bn; j < ai+len; j+=8 ) + { + //read 64B cachelines of a, compute c = a/b + //and write back c via 'scatter' + c[ ci+aix[j+0] ] = a[ j+0 ] / bval; + c[ ci+aix[j+1] ] = a[ j+1 ] / bval; + c[ ci+aix[j+2] ] = a[ j+2 ] / bval; + c[ ci+aix[j+3] ] = a[ j+3 ] / bval; + c[ ci+aix[j+4] ] = a[ j+4 ] / bval; + c[ ci+aix[j+5] ] = a[ j+5 ] / bval; + c[ ci+aix[j+6] ] = a[ j+6 ] / bval; + c[ ci+aix[j+7] ] = a[ j+7 ] / bval; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java new file mode 100644 index 0000000..51c5164 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.codegen; + +import java.io.Serializable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.functionobjects.KahanPlus; +import org.apache.sysml.runtime.instructions.cp.DoubleObject; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.util.UtilFunctions; + +public abstract class SpoofCellwise extends SpoofOperator implements Serializable +{ + private static final long serialVersionUID = 3442528770573293590L; + private static final long PAR_NUMCELL_THRESHOLD = 1024*1024; //Min 1M elements + + public enum CellType { + NO_AGG, + FULL_AGG, + ROW_AGG, + } + + protected CellType _type = CellType.NO_AGG; + + public SpoofCellwise() { + + } + + public CellType getCellType() { + return _type; + } + + @Override + public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, int k) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 1 ) + throw new RuntimeException("Invalid input arguments."); + + if( inputs.get(0).getNumRows()*inputs.get(0).getNumColumns()<PAR_NUMCELL_THRESHOLD ) { + k = 1; //serial execution + } + + //input preparation + double[][] b = prepInputMatrices(inputs); + double[] scalars = prepInputScalars(scalarObjects); + + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + double sum = 0; + if( k <= 1 ) //SINGLE-THREADED + { + sum = ( !inputs.get(0).isInSparseFormat() ) ? + executeDenseAndAgg(inputs.get(0).getDenseBlock(), b, scalars, n, m, 0, m) : + executeSparseAndAgg(inputs.get(0).getSparseBlock(), b, scalars, n, m, 0, m); + } + else //MULTI-THREADED + { + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<ParAggTask> tasks = new ArrayList<ParAggTask>(); + int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); + int blklen = (int)(Math.ceil((double)m/nk)); + for( int i=0; i<nk & i*blklen<m; i++ ) + tasks.add(new ParAggTask(inputs.get(0), b, scalars, n, m,i*blklen, Math.min((i+1)*blklen, m))); + //execute tasks + List<Future<Double>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + + //aggregate partial results + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + for( Future<Double> task : taskret ) + kplus.execute2(kbuff, task.get()); + sum = kbuff._sum; + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + return new DoubleObject(sum); + } + + @Override + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) + throws DMLRuntimeException + { + execute(inputs, scalarObjects, out, 1); + } + + @Override + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 1 || out==null ) + throw new RuntimeException("Invalid input arguments."); + + if( inputs.get(0).getNumRows()*inputs.get(0).getNumColumns()<PAR_NUMCELL_THRESHOLD ) { + k = 1; //serial execution + } + + //result allocation and preparations + out.reset(inputs.get(0).getNumRows(), _type == CellType.NO_AGG ? + inputs.get(0).getNumColumns() : 1, false); + out.allocateDenseBlock(); + double[] c = out.getDenseBlock(); + + //input preparation + double[][] b = prepInputMatrices(inputs); + double[] scalars = prepInputScalars(scalarObjects); + + //core sequential execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + + long lnnz = 0; + if( k <= 1 ) //SINGLE-THREADED + { + lnnz = (!inputs.get(0).isInSparseFormat()) ? + executeDense(inputs.get(0).getDenseBlock(), b, scalars, c, n, m, 0, m) : + executeSparse(inputs.get(0).getSparseBlock(), b, scalars, c, n, m, 0, m); + } + else //MULTI-THREADED + { + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<ParExecTask> tasks = new ArrayList<ParExecTask>(); + int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); + int blklen = (int)(Math.ceil((double)m/nk)); + for( int i=0; i<nk & i*blklen<m; i++ ) + tasks.add(new ParExecTask(inputs.get(0), b, scalars, c, + n, m, i*blklen, Math.min((i+1)*blklen, m))); + //execute tasks + List<Future<Long>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + + //aggregate nnz and error handling + for( Future<Long> task : taskret ) + lnnz += task.get(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + + //post-processing + out.setNonZeros(lnnz); + out.examSparsity(); + } + + /** + * + * @param a + * @param b + * @param c + * @param n + * @param rl + * @param ru + */ + private double executeDenseAndAgg(double[] a, double[][] b, double[] scalars, int n, int m, int rl, int ru) + { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + if( a == null ) { //empty + //note: we can't determine sparse-safeness by executing the operator once + //as the output might change with different row indices + for( int i=rl; i<ru; i++ ) + for( int j=0; j<n; j++ ) + kplus.execute2(kbuff, genexecDense( 0, b, scalars, n, m, i, j )); + } + else { //general case + for( int i=rl, ix=rl*n; i<ru; i++ ) + for( int j=0; j<n; j++, ix++ ) + kplus.execute2(kbuff, genexecDense( a[ix], b, scalars, n, m, i, j )); + } + + return kbuff._sum; + } + + private long executeDense(double[] a, double[][] b,double[] scalars, double[] c, int n, int m, int rl, int ru) + { + long lnnz = 0; + + if( _type == CellType.NO_AGG ) + { + if( a == null ) { //empty + //note: we can't determine sparse-safeness by executing the operator once + //as the output might change with different row indices + for( int i=rl, ix=rl*n; i<ru; i++ ) + for( int j=0; j<n; j++, ix++ ) { + c[ix] = genexecDense( 0, b, scalars, n, m, i, j ); + lnnz += (c[ix]!=0) ? 1 : 0; + } + } + else { //general case + for( int i=rl, ix=rl*n; i<ru; i++ ) + for( int j=0; j<n; j++, ix++ ) { + c[ix] = genexecDense( a[ix], b, scalars, n, m, i, j); + lnnz += (c[ix]!=0) ? 1 : 0; + } + } + } + else if( _type == CellType.ROW_AGG ) + { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + if( a == null ) { //empty + //note: we can't determine sparse-safeness by executing the operator once + //as the output might change with different row indices + for( int i=rl; i<ru; i++ ) { + kbuff.set(0, 0); + for( int j=0; j<n; j++ ) + kplus.execute2(kbuff, genexecDense( 0, b, scalars, n, m, i, j )); + c[i] = kbuff._sum; + lnnz += (c[i]!=0) ? 1 : 0; + } + } + else { //general case + for( int i=rl, ix=rl*n; i<ru; i++ ) { + kbuff.set(0, 0); + for( int j=0; j<n; j++, ix++ ) + kplus.execute2(kbuff, genexecDense( a[ix], b, scalars, n, m, i, j )); + c[i] = kbuff._sum; + lnnz += (c[i]!=0) ? 1 : 0; + } + } + } + + return lnnz; + } + + private double executeSparseAndAgg(SparseBlock sblock, double[][] b, double[] scalars, int n, int m, int rl, int ru) + { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + //TODO rework sparse safe test + double val = genexecDense( 0, b, scalars, n, m, 0, 0 ); + + if(val == 0 && b.length==0) // sparse safe + { + if( sblock != null ) { + for( int i=rl; i<ru; i++ ) + if( !sblock.isEmpty(i) ) { + int apos = sblock.pos(i); + int alen = sblock.size(i); + double[] avals = sblock.values(i); + for( int j=apos; j<apos+alen; j++ ) { + kplus.execute2( kbuff, genexecDense(avals[j], b, scalars, n, m, i, j)); + } + } + } + } + else //sparse-unsafe + { + for(int i=rl; i<ru; i++) + for(int j=0; j<n; j++) { + double valij = (sblock != null) ? sblock.get(i, j) : 0; + kplus.execute2( kbuff, genexecDense(valij, b, scalars, n, m, i, j)); + } + } + + return kbuff._sum; + } + + private long executeSparse(SparseBlock sblock, double[][] b, double[] scalars, double[] c, int n, int m, int rl, int ru) + { + //TODO rework sparse safe test + double val0 = genexecDense( 0, b, scalars, n, m, 0, 0 ); + long lnnz = 0; + + if( _type == CellType.NO_AGG ) + { + if(val0 == 0 && b.length == 0) // sparse safe + { + if( sblock != null ) { + for( int i=rl; i<ru; i++ ) + if( !sblock.isEmpty(i) ) { + int apos = sblock.pos(i); + int alen = sblock.size(i); + double[] avals = sblock.values(i); + for( int j=apos; j<apos+alen; j++ ) { + double val = genexecDense(avals[j], b, scalars, n, m, i, j); + c[i*n+sblock.indexes(i)[j]] = val; + lnnz += (val!=0) ? 1 : 0; + } + } + } + } + else //sparse-unsafe + { + for(int i=rl, cix=rl*n; i<ru; i++, cix+=n) + for(int j=0; j<n; j++) { + double valij = (sblock != null) ? sblock.get(i, j) : 0; + c[cix+j] = genexecDense(valij, b, scalars, n, m, i, j); + lnnz += (c[cix+j]!=0) ? 1 : 0; + } + } + } + else if( _type == CellType.ROW_AGG ) + { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + if(val0 == 0 && b.length == 0) // sparse safe + { + if( sblock != null ) { + for( int i=rl; i<ru; i++ ) { + if( sblock.isEmpty(i) ) continue; + kbuff.set(0, 0); + int apos = sblock.pos(i); + int alen = sblock.size(i); + double[] avals = sblock.values(i); + for( int j=apos; j<apos+alen; j++ ) { + kplus.execute2(kbuff, genexecDense(avals[j], b, scalars, n, m, i, j)); + } + c[i] = kbuff._sum; + lnnz += (c[i]!=0) ? 1 : 0; + } + } + } + else //sparse-unsafe + { + for(int i=rl; i<ru; i++) { + kbuff.set(0, 0); + for(int j=0; j<n; j++) { + double valij = (sblock != null) ? sblock.get(i, j) : 0; + kplus.execute2( kbuff, genexecDense(valij, b, scalars, n, m, i, j)); + } + c[i] = kbuff._sum; + lnnz += (c[i]!=0) ? 1 : 0; + } + } + } + + return lnnz; + } + + protected abstract double genexecDense( double a, double[][] b, double[] scalars, int n, int m, int rowIndex, int colIndex); + + private class ParAggTask implements Callable<Double> + { + private final MatrixBlock _a; + private final double[][] _b; + private final double[] _scalars; + private final int _clen; + private final int _rlen; + private final int _rl; + private final int _ru; + + protected ParAggTask( MatrixBlock a, double[][] b, double[] scalars, int clen, int rlen, int rl, int ru ) { + _a = a; + _b = b; + _scalars = scalars; + _clen = clen; + _rlen = rlen; + _rl = rl; + _ru = ru; + } + + @Override + public Double call() throws DMLRuntimeException { + return ( !_a.isInSparseFormat()) ? + executeDenseAndAgg(_a.getDenseBlock(), _b, _scalars, _clen, _rlen, _rl, _ru) : + executeSparseAndAgg(_a.getSparseBlock(), _b, _scalars, _clen, _rlen, _rl, _ru); + } + } + + private class ParExecTask implements Callable<Long> + { + private final MatrixBlock _a; + private final double[][] _b; + private final double[] _scalars; + private final double[] _c; + private final int _clen; + private final int _rlen; + private final int _rl; + private final int _ru; + + protected ParExecTask( MatrixBlock a, double[][] b, double[] scalars, double[] c, int clen, int rlen, int rl, int ru ) { + _a = a; + _b = b; + _scalars = scalars; + _c = c; + _clen = clen; + _rlen = rlen; + _rl = rl; + _ru = ru; + } + + @Override + public Long call() throws DMLRuntimeException { + return (!_a.isInSparseFormat()) ? + executeDense(_a.getDenseBlock(), _b, _scalars, _c, _clen, _rlen, _rl, _ru) : + executeSparse(_a.getSparseBlock(), _b, _scalars, _c, _clen, _rlen, _rl, _ru); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java new file mode 100644 index 0000000..ddbf96d --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.codegen; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +public abstract class SpoofOperator implements Serializable +{ + private static final long serialVersionUID = 3834006998853573319L; + + public abstract void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, MatrixBlock out) + throws DMLRuntimeException; + + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, MatrixBlock out, int k) + throws DMLRuntimeException + { + //default implementation serial execution + execute(inputs, scalars, out); + } + + public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars) throws DMLRuntimeException { + throw new RuntimeException("Invalid invocation in base class."); + } + + public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, int k) + throws DMLRuntimeException + { + //default implementation serial execution + return execute(inputs, scalars); + } + + protected double[][] prepInputMatrices(ArrayList<MatrixBlock> inputs) { + return prepInputMatrices(inputs, 1); + } + + protected double[][] prepInputMatrices(ArrayList<MatrixBlock> inputs, int offset) { + double[][] b = new double[inputs.size()-offset][]; + for(int i=offset; i < inputs.size(); i++) { + if( inputs.get(i).isEmptyBlock(false) && !inputs.get(i).isAllocated() ) + inputs.get(i).allocateDenseBlock(); + b[i-offset] = inputs.get(i).getDenseBlock(); + } + return b; + } + + protected double[] prepInputScalars(ArrayList<ScalarObject> scalarObjects) { + double[] scalars = new double[scalarObjects.size()]; + for(int i=0; i < scalarObjects.size(); i++) + scalars[i] = scalarObjects.get(i).getDoubleValue(); + return scalars; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java new file mode 100644 index 0000000..a23ea5a --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOuterProduct.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.codegen; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.instructions.cp.DoubleObject; +import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; + +public abstract class SpoofOuterProduct extends SpoofOperator +{ + private static final long serialVersionUID = 2948612259863710279L; + + private static final int L2_CACHESIZE = 256 * 1024; //256KB (common size) + + public enum OutProdType { + LEFT_OUTER_PRODUCT, + RIGHT_OUTER_PRODUCT, + CELLWISE_OUTER_PRODUCT, // (e.g., X*log(sigmoid(-(U%*%t(V))))) ) + AGG_OUTER_PRODUCT // (e.g.,sum(X*log(U%*%t(V)+eps))) ) + } + + protected OutProdType _outerProductType; + + public SpoofOuterProduct() { + + } + + public void setOuterProdType(OutProdType type) { + _outerProductType = type; + } + + public OutProdType getOuterProdType() { + return _outerProductType; + } + + @Override + public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 3 ) + throw new RuntimeException("Invalid input arguments."); + + //input preparation + double[][] b = prepInputMatrices(inputs, 3); + double[] scalars = prepInputScalars(scalarObjects); + + //core sequential execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + final int k = inputs.get(1).getNumColumns(); // rank + + //public static void matrixMultWDivMM(MatrixBlock mW, MatrixBlock mU, MatrixBlock mV, MatrixBlock mX, MatrixBlock ret, WDivMMType wt, int k) + MatrixBlock a = inputs.get(0); + MatrixBlock u = inputs.get(1); + MatrixBlock v = inputs.get(2); + + MatrixBlock out = new MatrixBlock(1, 1, false); + out.allocateDenseBlock(); + + if(!a.isInSparseFormat()) + executeCellwiseDense(a.getDenseBlock(), u.getDenseBlock(), v.getDenseBlock(), b, scalars, out.getDenseBlock(), n, m, k, _outerProductType, 0, m, 0, n); + else + executeCellwiseSparse(a.getSparseBlock(), u.getDenseBlock(), v.getDenseBlock(), b, scalars, out, n, m, k, (int) a.getNonZeros(), _outerProductType, 0, m, 0, n); + return new DoubleObject(out.getDenseBlock()[0]); + } + + @Override + public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, int numThreads) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 3 ) + throw new RuntimeException("Invalid input arguments."); + + //input preparation + double[][] b = prepInputMatrices(inputs, 3); + double[] scalars = prepInputScalars(scalarObjects); + + //core sequential execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + final int k = inputs.get(1).getNumColumns(); // rank + double sum = 0; + + try + { + ExecutorService pool = Executors.newFixedThreadPool(k); + ArrayList<ParOuterProdAggTask> tasks = new ArrayList<ParOuterProdAggTask>(); + //create tasks (for wdivmm-left, parallelization over columns; + //for wdivmm-right, parallelization over rows; both ensure disjoint results) + int blklen = (int)(Math.ceil((double)m/numThreads)); + for( int i=0; i<numThreads & i*blklen<m; i++ ) + tasks.add(new ParOuterProdAggTask(inputs.get(0), inputs.get(1).getDenseBlock(), inputs.get(2).getDenseBlock(), b, scalars, n, m, k, _outerProductType, i*blklen, Math.min((i+1)*blklen,m), 0, n)); + //execute tasks + List<Future<Double>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + for( Future<Double> task : taskret ) + sum += task.get(); + } + catch (Exception e) { + throw new DMLRuntimeException(e); + } + + return new DoubleObject(sum); + } + + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 3 || out==null ) + throw new RuntimeException("Invalid input arguments."); + + //check empty result + if( (_outerProductType == OutProdType.LEFT_OUTER_PRODUCT && inputs.get(1).isEmptyBlock(false)) //U is empty + || (_outerProductType == OutProdType.RIGHT_OUTER_PRODUCT && inputs.get(2).isEmptyBlock(false)) //V is empty + || (_outerProductType == OutProdType.CELLWISE_OUTER_PRODUCT && inputs.get(0).isEmptyBlock(false))) { //X is empty + out.examSparsity(); //turn empty dense into sparse + return; + } + + //input preparation and result allocation (Allocate the output that is set by Sigma2CPInstruction) + if(_outerProductType == OutProdType.CELLWISE_OUTER_PRODUCT) { + //assign it to the time and sparse representation of the major input matrix + out.reset(inputs.get(0).getNumRows(), inputs.get(0).getNumColumns(), inputs.get(0).isInSparseFormat()); + out.allocateDenseOrSparseBlock(); + } + else { + //if left outerproduct gives a value of k*n instead of n*k, change it back to n*k and then transpose the output + //if(_outerProductType == OutProdType.LEFT_OUTER_PRODUCT && out.getNumRows() == inputs.get(2).getNumColumns() && out.getNumColumns() == inputs.get(2).getNumRows()) + if(_outerProductType == OutProdType.LEFT_OUTER_PRODUCT ) + out.reset(inputs.get(0).getNumColumns(),inputs.get(1).getNumColumns()); // n*k + else if(_outerProductType == OutProdType.RIGHT_OUTER_PRODUCT ) + out.reset(inputs.get(0).getNumRows(),inputs.get(1).getNumColumns()); // m*k + out.allocateDenseBlock(); + } + + //input preparation + double[][] b = prepInputMatrices(inputs, 3); + double[] scalars = prepInputScalars(scalarObjects); + + //core sequential execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + final int k = inputs.get(1).getNumColumns(); // rank + + MatrixBlock a = inputs.get(0); + MatrixBlock u = inputs.get(1); + MatrixBlock v = inputs.get(2); + + switch(_outerProductType) { + case LEFT_OUTER_PRODUCT: + case RIGHT_OUTER_PRODUCT: + if( !a.isInSparseFormat() ) + executeDense(a.getDenseBlock(), u.getDenseBlock(), v.getDenseBlock(), b, scalars, out.getDenseBlock(), n, m, k, _outerProductType, 0, m, 0, n); + else + executeSparse(a.getSparseBlock(), u.getDenseBlock(), v.getDenseBlock(), b, scalars, out.getDenseBlock(), n, m, k, (int) a.getNonZeros(), _outerProductType, 0, m, 0, n); + break; + + case CELLWISE_OUTER_PRODUCT: + if( !a.isInSparseFormat() ) + executeCellwiseDense(a.getDenseBlock(), u.getDenseBlock(), v.getDenseBlock(), b, scalars, out.getDenseBlock(), n, m, k, _outerProductType, 0, m, 0, n); + else + executeCellwiseSparse(a.getSparseBlock(), u.getDenseBlock(), v.getDenseBlock(), b, scalars, out, n, m, k, (int) a.getNonZeros(), _outerProductType, 0, m, 0, n); + break; + + case AGG_OUTER_PRODUCT: + throw new DMLRuntimeException("Wrong codepath for aggregate outer product."); + } + + //post-processing + out.recomputeNonZeros(); + out.examSparsity(); + } + + @Override + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int numThreads) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 3 || out==null ) + throw new RuntimeException("Invalid input arguments."); + + //check empty result + if( (_outerProductType == OutProdType.LEFT_OUTER_PRODUCT && inputs.get(1).isEmptyBlock(false)) //U is empty + || (_outerProductType == OutProdType.RIGHT_OUTER_PRODUCT && inputs.get(2).isEmptyBlock(false)) //V is empty + || (_outerProductType == OutProdType.CELLWISE_OUTER_PRODUCT && inputs.get(0).isEmptyBlock(false))) { //X is empty + out.examSparsity(); //turn empty dense into sparse + return; + } + + //input preparation and result allocation (Allocate the output that is set by Sigma2CPInstruction) + if(_outerProductType == OutProdType.CELLWISE_OUTER_PRODUCT) + { + //assign it to the time and sparse representation of the major input matrix + out.reset(inputs.get(0).getNumRows(), inputs.get(0).getNumColumns(), inputs.get(0).isInSparseFormat()); + out.allocateDenseOrSparseBlock(); + } + else + { + //if left outerproduct gives a value of k*n instead of n*k, change it back to n*k and then transpose the output + //if(_outerProductType == OutProdType.LEFT_OUTER_PRODUCT && out.getNumRows() == inputs.get(2).getNumColumns() && out.getNumColumns() == inputs.get(2).getNumRows()) + if(_outerProductType == OutProdType.LEFT_OUTER_PRODUCT ) + out.reset(inputs.get(0).getNumColumns(),inputs.get(1).getNumColumns()); // n*k + else if(_outerProductType == OutProdType.RIGHT_OUTER_PRODUCT ) + out.reset(inputs.get(0).getNumRows(),inputs.get(1).getNumColumns()); // m*k + out.allocateDenseBlock(); + } + + //input preparation + double[][] b = prepInputMatrices(inputs, 3); + double[] scalars = prepInputScalars(scalarObjects); + + //core sequential execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + final int k = inputs.get(1).getNumColumns(); // rank + + try + { + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ArrayList<ParExecTask> tasks = new ArrayList<ParExecTask>(); + //create tasks (for wdivmm-left, parallelization over columns; + //for wdivmm-right, parallelization over rows; both ensure disjoint results) + + if( _outerProductType == OutProdType.LEFT_OUTER_PRODUCT ) { + int blklen = (int)(Math.ceil((double)n/numThreads)); + for( int j=0; j<numThreads & j*blklen<n; j++ ) + tasks.add(new ParExecTask(inputs.get(0), inputs.get(1).getDenseBlock(), inputs.get(2).getDenseBlock(), b, scalars, out, n, m, k, _outerProductType, 0, m, j*blklen, Math.min((j+1)*blklen, n))); + } + else { ///right // cellwise + int blklen = (int)(Math.ceil((double)m/numThreads)); + for( int i=0; i<numThreads & i*blklen<m; i++ ) + tasks.add(new ParExecTask(inputs.get(0), inputs.get(1).getDenseBlock(), inputs.get(2).getDenseBlock(), b, scalars, out, n, m, k, _outerProductType, i*blklen, Math.min((i+1)*blklen,m), 0, n)); + } + List<Future<Long>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + for( Future<Long> task : taskret ) + out.setNonZeros(out.getNonZeros() + task.get()); + } + catch (Exception e) { + throw new DMLRuntimeException(e); + } + + //post-processing + out.examSparsity(); + } + + private void executeDense(double[] a, double[] u, double[] v, double[][] b, double[] scalars , double[] c, int n, int m, int k, OutProdType type, int rl, int ru, int cl, int cu ) + { + //approach: iterate over non-zeros of w, selective mm computation + //cache-conscious blocking: due to blocksize constraint (default 1000), + //a blocksize of 16 allows to fit blocks of UV into L2 cache (256KB) + + final int blocksizeIJ = 16; //u/v block (max at typical L2 size) + int cix = 0; + //blocked execution + for( int bi = rl; bi < ru; bi+=blocksizeIJ ) + for( int bj = cl, bimin = Math.min(ru, bi+blocksizeIJ); bj < cu; bj+=blocksizeIJ ) + { + int bjmin = Math.min(cu, bj+blocksizeIJ); + + //core computation + for( int i=bi, ix=bi*n, uix=bi*k; i<bimin; i++, ix+=n, uix+=k ) + for( int j=bj, vix=bj*k; j<bjmin; j++, vix+=k) + if( a[ix+j] != 0 ) { + cix = (type == OutProdType.LEFT_OUTER_PRODUCT) ? vix : uix; + genexecDense( a[ix+j], u, uix, v, vix, b, scalars, c, cix, n, m, k, i,j);//(ix+j)/n, (ix+j)%n ); + } + } + } + + private void executeCellwiseDense(double[] a, double[] u, double[] v, double[][] b, double[] scalars , double[] c, int n, int m, int k, OutProdType type, int rl, int ru, int cl, int cu ) + { + //approach: iterate over non-zeros of w, selective mm computation + //cache-conscious blocking: due to blocksize constraint (default 1000), + //a blocksize of 16 allows to fit blocks of UV into L2 cache (256KB) + + final int blocksizeIJ = 16; //u/v block (max at typical L2 size) + //blocked execution + for( int bi = rl; bi < ru; bi+=blocksizeIJ ) + for( int bj = cl, bimin = Math.min(ru, bi+blocksizeIJ); bj < cu; bj+=blocksizeIJ ) + { + int bjmin = Math.min(cu, bj+blocksizeIJ); + + //core computation + for( int i=bi, ix=bi*n, uix=bi*k; i<bimin; i++, ix+=n, uix+=k ) + for( int j=bj, vix=bj*k; j<bjmin; j++, vix+=k) + if( a[ix+j] != 0 ) { + //int cix = (type == OutProdType.LEFT_OUTER_PRODUCT) ? vix : uix; + if(type == OutProdType.CELLWISE_OUTER_PRODUCT) + c[ix+j] = genexecCellwise( a[ix+j], u, uix, v, vix, b, scalars, n, m, k, i, j ); + else + c[0] += genexecCellwise( a[ix+j], u, uix, v, vix, b, scalars, n, m, k, i, j); // (ix+j)/n, (ix+j)%n ); + } + } + } + + private void executeSparse(SparseBlock sblock, double[] u, double[] v, double[][] b, double[] scalars , double[] c, int n, int m, int k, int nnz, OutProdType type, int rl, int ru, int cl, int cu) + { + boolean left = (_outerProductType== OutProdType.LEFT_OUTER_PRODUCT); + + //approach: iterate over non-zeros of w, selective mm computation + //blocked over ij, while maintaining front of column indexes, where the + //blocksize is chosen such that we reuse each Ui/Vj vector on average 8 times, + //with custom blocksizeJ for wdivmm_left to avoid LLC misses on output. + final int blocksizeI = (int) (8L*m*n/nnz); + final int blocksizeJ = left ? Math.max(8,Math.min(L2_CACHESIZE/(k*8), blocksizeI)) : blocksizeI; + int[] curk = new int[blocksizeI]; + + for( int bi = rl; bi < ru; bi+=blocksizeI ) + { + int bimin = Math.min(ru, bi+blocksizeI); + //prepare starting indexes for block row + for( int i=bi; i<bimin; i++ ) { + int index = (cl==0||sblock.isEmpty(i)) ? 0 : sblock.posFIndexGTE(i,cl); + curk[i-bi] = (index>=0) ? index : n; + } + + //blocked execution over column blocks + for( int bj = cl; bj < cu; bj+=blocksizeJ ) + { + int bjmin = Math.min(cu, bj+blocksizeJ); + //core wdivmm block matrix mult + for( int i=bi, uix=bi*k; i<bimin; i++, uix+=k ) { + if( sblock.isEmpty(i) ) continue; + + int wpos = sblock.pos(i); + int wlen = sblock.size(i); + int[] wix = sblock.indexes(i); + double[] wval = sblock.values(i); + + int index = wpos + curk[i-bi]; + for( ; index<wpos+wlen && wix[index]<bjmin; index++ ) { + genexecDense( wval[index], u, uix, v, wix[index]*k, b, scalars, c, + (left ? wix[index]*k : uix), n, m, k, i, wix[index] ); + } + curk[i-bi] = index - wpos; + } + } + } + } + + private void executeCellwiseSparse(SparseBlock sblock, double[] u, double[] v, double[][] b, double[] scalars , MatrixBlock out, int n, int m, int k, long nnz, OutProdType type, int rl, int ru, int cl, int cu ) + { + final int blocksizeIJ = (int) (8L*m*n/nnz); + int[] curk = new int[blocksizeIJ]; + + if( !out.isInSparseFormat() ) //DENSE + { + double[] c = out.getDenseBlock(); + for( int bi=rl; bi<ru; bi+=blocksizeIJ ) { + int bimin = Math.min(ru, bi+blocksizeIJ); + //prepare starting indexes for block row + Arrays.fill(curk, 0); + //blocked execution over column blocks + for( int bj=0; bj<n; bj+=blocksizeIJ ) { + int bjmin = Math.min(n, bj+blocksizeIJ); + for( int i=bi, uix=bi*k; i<bimin; i++, uix+=k ) { + if( sblock.isEmpty(i) ) continue; + int wpos = sblock.pos(i); + int wlen = sblock.size(i); + int[] wix = sblock.indexes(i); + double[] wval = sblock.values(i); + int index = wpos + curk[i-bi]; + for( ; index<wpos+wlen && wix[index]<bjmin; index++ ) { + if(type == OutProdType.CELLWISE_OUTER_PRODUCT) + c[index] = genexecCellwise( wval[index], u, uix, v, wix[index]*k, b, scalars, n, m, k, i, wix[index] ); + else + c[0] += genexecCellwise( wval[index], u, uix, v, wix[index]*k, b, scalars, n, m, k, i, wix[index]); // (ix+j)/n, (ix+j)%n ); + } + curk[i-bi] = index - wpos; + } + } + } + } + else //SPARSE + { + SparseBlock c = out.getSparseBlock(); + for( int bi=rl; bi<ru; bi+=blocksizeIJ ) { + int bimin = Math.min(ru, bi+blocksizeIJ); + //prepare starting indexes for block row + Arrays.fill(curk, 0); + //blocked execution over column blocks + for( int bj=0; bj<n; bj+=blocksizeIJ ) { + int bjmin = Math.min(n, bj+blocksizeIJ); + for( int i=bi, uix=bi*k; i<bimin; i++, uix+=k ) { + if( sblock.isEmpty(i) ) continue; + int wpos = sblock.pos(i); + int wlen = sblock.size(i); + int[] wix = sblock.indexes(i); + double[] wval = sblock.values(i); + int index = wpos + curk[i-bi]; + for( ; index<wpos+wlen && wix[index]<bjmin; index++ ) { + c.append(i, index, genexecCellwise( wval[index], u, uix, v, + wix[index]*k, b, scalars, n, m, k, i, wix[index] )); + } + curk[i-bi] = index - wpos; + } + } + } + } + } + + protected abstract void genexecDense( double a, double[] u, int ui, double[] v, int vi, double[][] b, double[] scalars , double[] c, int ci, int n, int m, int k, int rowIndex, int colIndex ); + + protected abstract double genexecCellwise( double a, double[] u, int ui, double[] v, int vi, double[][] b, double[] scalars , int n, int m, int k, int rowIndex, int colIndex); + + private class ParExecTask implements Callable<Long> + { + private final MatrixBlock _a; + private final double[] _u; + private final double[] _v; + private final double[][] _b; + private final double[] _scalars; + private final MatrixBlock _c; + private final int _clen; + private final int _rlen; + private final int _k; + private final OutProdType _type; + private final int _rl; + private final int _ru; + private final int _cl; + private final int _cu; + + protected ParExecTask( MatrixBlock a, double[] u, double[] v, double[][] b, double[] scalars , MatrixBlock c, int clen, int rlen, int k, OutProdType type, int rl, int ru, int cl, int cu ) { + _a = a; + _u = u; + _v = v; + _b = b; + _c = c; + _scalars = scalars; + _clen = clen; + _rlen = rlen; + _k = k; + _type = type; + _rl = rl; + _ru = ru; + _cl = cl; + _cu = cu; + } + + @Override + public Long call() throws DMLRuntimeException { + switch(_type) + { + case LEFT_OUTER_PRODUCT: + case RIGHT_OUTER_PRODUCT: + if( !_a.isInSparseFormat() ) + executeDense(_a.getDenseBlock(), _u, _v, _b, _scalars, _c.getDenseBlock(), _clen, _rlen, _k, _type, _rl, _ru, _cl, _cu); + else + executeSparse(_a.getSparseBlock(), _u, _v, _b, _scalars, _c.getDenseBlock(), _clen, _rlen, _k, (int) _a.getNonZeros(), _type, _rl, _ru, _cl, _cu); + break; + case CELLWISE_OUTER_PRODUCT: + if( !_c.isInSparseFormat() ) + executeCellwiseDense(_a.getDenseBlock(), _u, _v, _b, _scalars, _c.getDenseBlock(), _clen, _rlen, _k, _type, _rl, _ru, _cl, _cu); + else + executeCellwiseSparse(_a.getSparseBlock(), _u, _v, _b, _scalars, _c, _clen, _rlen, _k, (int) _a.getNonZeros(), _type, _rl, _ru, _cl, _cu); + break; + case AGG_OUTER_PRODUCT: + throw new DMLRuntimeException("Wrong codepath for aggregate outer product."); + } + + int rl = _outerProductType == OutProdType.LEFT_OUTER_PRODUCT ? _cl : _rl; + int ru = _outerProductType == OutProdType.LEFT_OUTER_PRODUCT ? _cu : _ru; + return _c.recomputeNonZeros(rl, ru-1, 0, _c.getNumColumns()-1); + } + } + + private class ParOuterProdAggTask implements Callable<Double> + { + private final MatrixBlock _a; + private final double[] _u; + private final double[] _v; + private final double[][] _b; + private final double[] _scalars; + private final int _clen; + private final int _rlen; + private final int _k; + private final OutProdType _type; + private final int _rl; + private final int _ru; + private final int _cl; + private final int _cu; + + protected ParOuterProdAggTask( MatrixBlock a, double[] u, double[] v, double[][] b, double[] scalars, int clen, int rlen, int k, OutProdType type, int rl, int ru, int cl, int cu ) { + _a = a; + _u = u; + _v = v; + _b = b; + _scalars = scalars; + _clen = clen; + _rlen = rlen; + _k = k; + _type = type; + _rl = rl; + _ru = ru; + _cl = cl; + _cu = cu; + } + + @Override + public Double call() throws DMLRuntimeException { + MatrixBlock out = new MatrixBlock(1, 1, false); + out.allocateDenseBlock(); + if(!_a.isInSparseFormat()) + executeCellwiseDense(_a.getDenseBlock(), _u, _v, _b, _scalars, out.getDenseBlock(), _clen, _rlen, _k, _type, _rl, _ru, _cl, _cu); + else + executeCellwiseSparse(_a.getSparseBlock(), _u, _v, _b, _scalars, out, _clen, _rlen, _k, _a.getNonZeros(), _type, _rl, _ru, _cl, _cu); + return out.getDenseBlock()[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowAggregate.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowAggregate.java new file mode 100644 index 0000000..c0b58a8 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowAggregate.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.codegen; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.matrix.data.LibMatrixMult; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.util.UtilFunctions; + + +public abstract class SpoofRowAggregate extends SpoofOperator +{ + private static final long serialVersionUID = 6242910797139642998L; + private static final long PAR_NUMCELL_THRESHOLD = 1024*1024; //Min 1M elements + + protected boolean _colVector = false; + + public SpoofRowAggregate() { + + } + + @Override + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) + throws DMLRuntimeException + { + //sanity check + if( inputs==null || inputs.size() < 1 || out==null ) + throw new RuntimeException("Invalid input arguments."); + + //result allocation and preparations + out.reset(_colVector ? inputs.get(0).getNumColumns() : 1, + _colVector ? 1 : inputs.get(0).getNumColumns(), false); + out.allocateDenseBlock(); + double[] c = out.getDenseBlock(); + + //input preparation + double[][] b = prepInputMatrices(inputs); + double[] scalars = prepInputScalars(scalarObjects); + + //core sequential execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + if( !inputs.get(0).isInSparseFormat() ) + executeDense(inputs.get(0).getDenseBlock(), b, scalars, c, n, 0, m); + else + executeSparse(inputs.get(0).getSparseBlock(), b, scalars, c, n, 0, m); + + //post-processing + out.recomputeNonZeros(); + } + + @Override + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k) + throws DMLRuntimeException + { + //redirect to serial execution + if( k <= 1 || (long)inputs.get(0).getNumRows()*inputs.get(0).getNumColumns()<PAR_NUMCELL_THRESHOLD ) { + execute(inputs, scalarObjects, out); + return; + } + + //sanity check + if( inputs==null || inputs.size() < 1 || out==null ) + throw new RuntimeException("Invalid input arguments."); + + //result allocation and preparations + out.reset(_colVector ? inputs.get(0).getNumColumns() : 1, + _colVector ? 1 : inputs.get(0).getNumColumns(), false); + out.allocateDenseBlock(); + + //input preparation + double[][] b = prepInputMatrices(inputs); + double[] scalars = prepInputScalars(scalarObjects); + + //core parallel execute + final int m = inputs.get(0).getNumRows(); + final int n = inputs.get(0).getNumColumns(); + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<ParExecTask> tasks = new ArrayList<ParExecTask>(); + int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); + int blklen = (int)(Math.ceil((double)m/nk)); + for( int i=0; i<nk & i*blklen<m; i++ ) + tasks.add(new ParExecTask(inputs.get(0), b, scalars, n, i*blklen, Math.min((i+1)*blklen, m))); + //execute tasks + List<Future<double[]>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + //aggregate partial results + for( Future<double[]> task : taskret ) + LibMatrixMult.vectAdd(task.get(), out.getDenseBlock(), 0, 0, n); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + //post-processing + out.recomputeNonZeros(); + } + + private void executeDense(double[] a, double[][] b, double[] scalars, double[] c, int n, int rl, int ru) + { + for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) { + //call generated method + genexecRowDense( a, aix, b, scalars, c, n, i ); + } + } + + private void executeSparse(SparseBlock sblock, double[][] b, double[] scalars, double[] c, int n, int rl, int ru) + { + for( int i=rl; i<ru; i++ ) { + if( !sblock.isEmpty(i) ) { + double[] avals = sblock.values(i); + int[] aix = sblock.indexes(i); + int apos = sblock.pos(i); + int alen = sblock.size(i); + + //call generated method + genexecRowSparse(avals, aix, apos, b, scalars, c, alen, i); + } + } + } + + //methods to be implemented by generated operators of type SpoofRowAggrgate + + protected abstract void genexecRowDense( double[] a, int ai, double[][] b, double[] scalars, double[] c, int len, int rowIndex ); + + protected abstract void genexecRowSparse( double[] avals, int[] aix, int ai, double[][] b, double[] scalars, double[] c, int len, int rowIndex ); + + + /** + * Task for multi-threaded operations. + */ + private class ParExecTask implements Callable<double[]> + { + private final MatrixBlock _a; + private final double[][] _b; + private final double[] _scalars; + private final int _clen; + private final int _rl; + private final int _ru; + + protected ParExecTask( MatrixBlock a, double[][] b, double[] scalars, int clen, int rl, int ru ) { + _a = a; + _b = b; + _scalars = scalars; + _clen = clen; + _rl = rl; + _ru = ru; + } + + @Override + public double[] call() throws DMLRuntimeException { + double[] c = new double[_clen]; + if( !_a.isInSparseFormat() ) + executeDense(_a.getDenseBlock(), _b, _scalars, c, _clen, _rl, _ru); + else + executeSparse(_a.getSparseBlock(), _b, _scalars, c, _clen, _rl, _ru); + + return c; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/IDSequence.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/IDSequence.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/IDSequence.java index 1d173bb..24056dc 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/IDSequence.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/IDSequence.java @@ -25,8 +25,6 @@ package org.apache.sysml.runtime.controlprogram.parfor.util; */ public class IDSequence { - - private long _current = -1; private boolean wrapAround = false; @@ -60,22 +58,11 @@ public class IDSequence return _current; } - public synchronized void reset() - { - _current = 0; - } - - /* - private AtomicLong _seq = new AtomicLong(0); - - public long getNextID() - { - return _seq.getAndIncrement(); + public synchronized long getCurrentID() { + return _current; } - public void reset() - { - _seq = new AtomicLong( 0 ); + public synchronized void reset() { + _current = 0; } - */ } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/instructions/cp/RelationalBinaryCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/RelationalBinaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/RelationalBinaryCPInstruction.java index c749764..dedfe56 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/RelationalBinaryCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/RelationalBinaryCPInstruction.java @@ -28,9 +28,7 @@ import org.apache.sysml.runtime.matrix.operators.Operator; public abstract class RelationalBinaryCPInstruction extends BinaryCPInstruction { - - public RelationalBinaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr ) - { + public RelationalBinaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr ) { super(op, in1, in2, out, opcode, istr); _cptype = CPINSTRUCTION_TYPE.RelationalBinary; } @@ -42,34 +40,19 @@ public abstract class RelationalBinaryCPInstruction extends BinaryCPInstruction CPOperand out = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN); String opcode = parseBinaryInstruction(str, in1, in2, out); - // TODO: Relational operations need not have value type checking ValueType vt1 = in1.getValueType(); DataType dt1 = in1.getDataType(); - ValueType vt2 = in2.getValueType(); DataType dt2 = in2.getDataType(); DataType dt3 = out.getDataType(); - //if ( vt3 != ValueType.BOOLEAN ) - // throw new DMLRuntimeException("Unexpected ValueType in RelationalCPInstruction: " + str); - - if ( vt1 == ValueType.BOOLEAN && !opcode.equalsIgnoreCase("==") && !opcode.equalsIgnoreCase("!=") ) + if( vt1 == ValueType.BOOLEAN && !opcode.equalsIgnoreCase("==") && !opcode.equalsIgnoreCase("!=") ) throw new DMLRuntimeException("Operation " + opcode + " can not be applied on boolean values " + "(Instruction = " + str + ")."); - //prithvi TODO - //make sure these checks belong here - //if either input is a matrix, then output - //has to be a matrix - if((dt1 == DataType.MATRIX - || dt2 == DataType.MATRIX) - && dt3 != DataType.MATRIX) - throw new DMLRuntimeException("Element-wise matrix operations between variables " - + in1.getName() - + " and " - + in2.getName() - + " must produce a matrix, which " - + out.getName() - + " is not"); + // check for valid data type of output + if((dt1 == DataType.MATRIX || dt2 == DataType.MATRIX) && dt3 != DataType.MATRIX) + throw new DMLRuntimeException("Element-wise matrix operations between variables " + in1.getName() + + " and " + in2.getName() + " must produce a matrix, which " + out.getName() + " is not"); Operator operator = (dt1 != dt2) ? InstructionUtils.parseScalarBinaryOperator(opcode, (dt1 == DataType.SCALAR)) : @@ -77,24 +60,11 @@ public abstract class RelationalBinaryCPInstruction extends BinaryCPInstruction //for scalar relational operations we only allow boolean operands //or when both operands are numeric (int or double) - if(dt1 == DataType.SCALAR && dt2 == DataType.SCALAR){ - if (!( (vt1 == ValueType.BOOLEAN && vt2 == ValueType.BOOLEAN) - ||(vt1 == ValueType.STRING && vt2 == ValueType.STRING) - ||( (vt1 == ValueType.DOUBLE || vt1 == ValueType.INT) && (vt2 == ValueType.DOUBLE || vt2 == ValueType.INT)))) - { - throw new DMLRuntimeException("unexpected value-type in " - + "Relational Binary Instruction " - + "involving scalar operands."); - } + if(dt1 == DataType.SCALAR && dt2 == DataType.SCALAR) return new ScalarScalarRelationalCPInstruction(operator, in1, in2, out, opcode, str); - - }else if (dt1 == DataType.MATRIX || dt2 == DataType.MATRIX){ - if(dt1 == DataType.MATRIX && dt2 == DataType.MATRIX) - return new MatrixMatrixRelationalCPInstruction(operator, in1, in2, out, opcode, str); - else - return new ScalarMatrixRelationalCPInstruction(operator, in1, in2, out, opcode, str); - } - - return null; + else if(dt1 == DataType.MATRIX && dt2 == DataType.MATRIX) + return new MatrixMatrixRelationalCPInstruction(operator, in1, in2, out, opcode, str); + else + return new ScalarMatrixRelationalCPInstruction(operator, in1, in2, out, opcode, str); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/instructions/cp/ScalarScalarRelationalCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ScalarScalarRelationalCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ScalarScalarRelationalCPInstruction.java index 68607fb..ca2b12e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ScalarScalarRelationalCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ScalarScalarRelationalCPInstruction.java @@ -47,7 +47,8 @@ public class ScalarScalarRelationalCPInstruction extends RelationalBinaryCPInstr BinaryOperator dop = (BinaryOperator) _optr; - if ( so1 instanceof IntObject && so2 instanceof IntObject ) { + if ( (so1 instanceof IntObject || so1 instanceof BooleanObject) + && (so2 instanceof IntObject || so2 instanceof BooleanObject) ) { boolean rval = dop.fn.compare ( so1.getLongValue(), so2.getLongValue() ); sores = (ScalarObject) new BooleanObject(rval); } @@ -55,14 +56,6 @@ public class ScalarScalarRelationalCPInstruction extends RelationalBinaryCPInstr boolean rval = dop.fn.compare ( so1.getDoubleValue(), so2.getDoubleValue() ); sores = (ScalarObject) new BooleanObject(rval); } - else if ( so1 instanceof IntObject && so2 instanceof DoubleObject) { - boolean rval = dop.fn.compare ( so1.getLongValue(), so2.getDoubleValue() ); - sores = (ScalarObject) new BooleanObject(rval); - } - else if ( so1 instanceof DoubleObject && so2 instanceof IntObject ) { - boolean rval = dop.fn.compare ( so1.getDoubleValue(), so2.getLongValue() ); - sores = (ScalarObject) new BooleanObject(rval); - } else if ( so1 instanceof BooleanObject && so2 instanceof BooleanObject ) { boolean rval = dop.fn.compare ( so1.getBooleanValue(), so2.getBooleanValue() ); sores = (ScalarObject) new BooleanObject(rval); @@ -71,7 +64,16 @@ public class ScalarScalarRelationalCPInstruction extends RelationalBinaryCPInstr boolean rval = dop.fn.compare ( so1.getStringValue(), so2.getStringValue() ); sores = (ScalarObject) new BooleanObject(rval); } - else throw new DMLRuntimeException("compare(): Invalid combination of value types."); + else if ( so1 instanceof IntObject && so2 instanceof DoubleObject) { + boolean rval = dop.fn.compare ( so1.getLongValue(), so2.getDoubleValue() ); + sores = (ScalarObject) new BooleanObject(rval); + } + else if ( so1 instanceof DoubleObject && so2 instanceof IntObject ) { + boolean rval = dop.fn.compare ( so1.getDoubleValue(), so2.getLongValue() ); + sores = (ScalarObject) new BooleanObject(rval); + } + else throw new DMLRuntimeException("compare(): Invalid combination of value types " + + "(" + so1.getValueType() + ", " + so2.getValueType() + ")."); ec.setScalarOutput(output.getName(), sores); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUContext.java b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUContext.java index 8ca00b2..86a891e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUContext.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUContext.java @@ -30,6 +30,8 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; //FIXME merge JCudaContext into GPUContext as this context is anyway CUDA specific + +@SuppressWarnings("rawtypes") public abstract class GPUContext { public static ArrayList<GPUObject> allocatedPointers = new ArrayList<GPUObject>(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUObject.java b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUObject.java index 9708fe8..215b38c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/GPUObject.java @@ -106,6 +106,7 @@ public abstract class GPUObject * @param GPUSize Desired size to be freed up on the GPU * @throws DMLRuntimeException If no blocks to free up or if not enough blocks with zero locks on them. */ + @SuppressWarnings("rawtypes") protected static void evict(final long GPUSize) throws DMLRuntimeException { synchronized (GPUContext.syncObj) { // Check for the completion of asynchronous cudaFree calls http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaObject.java b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaObject.java index 24063b5..b9c9161 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaObject.java @@ -1004,6 +1004,7 @@ public class JCudaObject extends GPUObject { * @param lda rows in input matrix * @param ldc columns in output matrix * @return transposed matrix + * @throws DMLRuntimeException if operation failed */ public static Pointer transpose(Pointer densePtr, int m, int n, int lda, int ldc) throws DMLRuntimeException { Pointer alpha = LibMatrixCUDA.pointerTo(1.0); @@ -1146,6 +1147,7 @@ public class JCudaObject extends GPUObject { * @param toFree {@link Pointer} instance to be freed * @param synchronous true if to be done synchronously */ + @SuppressWarnings("rawtypes") public static void cudaFreeHelper(final Pointer toFree, boolean synchronous) { if (synchronous) { cudaFree(toFree); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java index 923e618..65f3be1 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java @@ -2905,7 +2905,8 @@ public class LibMatrixMult return val; } - private static double dotProduct( double[] a, double[] b, int ai, int bi, final int len ) + //note: public for use by codegen for consistency + public static double dotProduct( double[] a, double[] b, int ai, int bi, final int len ) { double val = 0; final int bn = len%8; @@ -2933,7 +2934,8 @@ public class LibMatrixMult return val; } - private static double dotProduct( double[] a, double[] b, int[] aix, int ai, final int bi, final int len ) + //note: public for use by codegen for consistency + public static double dotProduct( double[] a, double[] b, int[] aix, int ai, final int bi, final int len ) { double val = 0; final int bn = len%8; @@ -2962,7 +2964,8 @@ public class LibMatrixMult return val; } - private static void vectMultiplyAdd( final double aval, double[] b, double[] c, int bi, int ci, final int len ) + //note: public for use by codegen for consistency + public static void vectMultiplyAdd( final double aval, double[] b, double[] c, int bi, int ci, final int len ) { final int bn = len%8; @@ -3089,7 +3092,8 @@ public class LibMatrixMult } } - private static void vectMultiplyAdd( final double aval, double[] b, double[] c, int[] bix, final int bi, final int ci, final int len ) + //note: public for use by codegen for consistency + public static void vectMultiplyAdd( final double aval, double[] b, double[] c, int[] bix, final int bi, final int ci, final int len ) { final int bn = len%8; @@ -3115,7 +3119,8 @@ public class LibMatrixMult } } - private static void vectMultiplyWrite( final double aval, double[] b, double[] c, int bi, int ci, final int len ) + //note: public for use by codegen for consistency + public static void vectMultiplyWrite( final double aval, double[] b, double[] c, int bi, int ci, final int len ) { final int bn = len%8; @@ -3191,7 +3196,8 @@ public class LibMatrixMult } } - private static void vectAdd( double[] a, double[] c, int ai, int ci, final int len ) + //note: public for use by codegen for consistency + public static void vectAdd( double[] a, double[] c, int ai, int ci, final int len ) { final int bn = len%8; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java index 0086f8f..ec145cb 100644 --- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java +++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java @@ -25,8 +25,10 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.io.Writer; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; @@ -63,6 +65,7 @@ public class LocalFileUtils public static final String CATEGORY_PARTITIONING = "partitioning"; public static final String CATEGORY_RESULTMERGE = "resultmerge"; public static final String CATEGORY_WORK = "work"; + public static final String CATEGORY_CODEGEN = "codegen"; static { _seq = new IDSequence(); @@ -463,4 +466,25 @@ public class LocalFileUtils return ret; } + + /** + * Writes a simple text file to local file system. + * + * @param file output file + * @param text content of text file + * @throws IOException + */ + public static void writeTextFile( File file, String text ) + throws IOException + { + Writer writer = null; + try { + writer = new FileWriter(file); + writer.write(text); + writer.flush(); + } + finally { + IOUtilFunctions.closeSilently(writer); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d7fd5879/src/main/java/org/apache/sysml/utils/Statistics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/Statistics.java b/src/main/java/org/apache/sysml/utils/Statistics.java index 87cb64f..08f0452 100644 --- a/src/main/java/org/apache/sysml/utils/Statistics.java +++ b/src/main/java/org/apache/sysml/utils/Statistics.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.sysml.api.DMLScript; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -72,6 +74,15 @@ public class Statistics private static AtomicLong hopRecompilePred = new AtomicLong(0); //count private static AtomicLong hopRecompileSB = new AtomicLong(0); //count + //CODEGEN + private static AtomicLong codegenCompileTime = new AtomicLong(0); //in nano + private static AtomicLong codegenClassCompileTime = new AtomicLong(0); //in nano + private static AtomicLong codegenHopCompile = new AtomicLong(0); //count + private static AtomicLong codegenCPlanCompile = new AtomicLong(0); //count + private static AtomicLong codegenClassCompile = new AtomicLong(0); //count + private static AtomicLong codegenPlanCacheHits = new AtomicLong(0); //count + private static AtomicLong codegenPlanCacheTotal = new AtomicLong(0); //count + //Function recompile stats private static AtomicLong funRecompileTime = new AtomicLong(0); //in nano sec private static AtomicLong funRecompiles = new AtomicLong(0); //count @@ -277,6 +288,62 @@ public class Statistics //note: not synchronized due to use of atomics hopRecompileSB.addAndGet(delta); } + + public static void incrementCodegenDAGCompile() { + codegenHopCompile.incrementAndGet(); + } + + public static void incrementCodegenCPlanCompile(long delta) { + codegenCPlanCompile.addAndGet(delta); + } + + public static void incrementCodegenClassCompile() { + codegenClassCompile.incrementAndGet(); + } + + public static void incrementCodegenCompileTime(long delta) { + codegenCompileTime.addAndGet(delta); + } + + public static void incrementCodegenClassCompileTime(long delta) { + codegenClassCompileTime.addAndGet(delta); + } + + public static void incrementCodegenPlanCacheHits() { + codegenPlanCacheHits.incrementAndGet(); + } + + public static void incrementCodegenPlanCacheTotal() { + codegenPlanCacheTotal.incrementAndGet(); + } + + public static long getCodegenDAGCompile() { + return codegenHopCompile.get(); + } + + public static long getCodegenCPlanCompile() { + return codegenCPlanCompile.get(); + } + + public static long getCodegenClassCompile() { + return codegenClassCompile.get(); + } + + public static long getCodegenCompileTime() { + return codegenCompileTime.get(); + } + + public static long getCodegenClassCompileTime() { + return codegenClassCompileTime.get(); + } + + public static long getCodegenPlanCacheHits() { + return codegenPlanCacheHits.get(); + } + + public static long getCodegenPlanCacheTotal() { + return codegenPlanCacheTotal.get(); + } public static void incrementFunRecompileTime( long delta ) { //note: not synchronized due to use of atomics @@ -657,6 +724,12 @@ public class Statistics sb.append("Functions recompiled:\t\t" + getFunRecompiles() + ".\n"); sb.append("Functions recompile time:\t" + String.format("%.3f", ((double)getFunRecompileTime())/1000000000) + " sec.\n"); } + if( ConfigurationManager.getDMLConfig().getBooleanValue(DMLConfig.CODEGEN) ) { + sb.append("Codegen compile (DAG, CP, JC):\t" + getCodegenDAGCompile() + "/" + getCodegenCPlanCompile() + "/" + getCodegenClassCompile() + ".\n"); + sb.append("Codegen compile times (DAG,JC):\t" + String.format("%.3f", (double)getCodegenCompileTime()/1000000000) + "/" + + String.format("%.3f", (double)getCodegenClassCompileTime()/1000000000) + " sec.\n"); + sb.append("Codegen plan cache hits:\t" + getCodegenPlanCacheHits() + "/" + getCodegenPlanCacheTotal() + ".\n"); + } if( OptimizerUtils.isSparkExecutionMode() ){ String lazy = SparkExecutionContext.isLazySparkContextCreation() ? "(lazy)" : "(eager)"; sb.append("Spark ctx create time "+lazy+":\t"+