[SYSTEMML-569] Extended spark transformencode on frames (mv, omit, bin) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/172bfcac Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/172bfcac Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/172bfcac
Branch: refs/heads/master Commit: 172bfcacc0b260c49c18c9e26d2dfc81f7e3051e Parents: 12f2da9 Author: Matthias Boehm <[email protected]> Authored: Thu Jul 7 21:52:31 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jul 8 10:59:18 2016 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/parser/DataExpression.java | 1 + ...ReturnParameterizedBuiltinSPInstruction.java | 156 +++++++++++++++- .../sysml/runtime/matrix/data/FrameBlock.java | 5 +- .../sysml/runtime/transform/BinAgent.java | 3 +- .../sysml/runtime/transform/MVImputeAgent.java | 184 +++++++++++++------ .../sysml/runtime/transform/RecodeAgent.java | 2 +- .../transform/encode/EncoderComposite.java | 6 +- .../sysml/runtime/util/UtilFunctions.java | 2 +- .../TransformFrameEncodeApplyTest.java | 178 ++++++++++++++++++ .../transform/TransformFrameEncodeApply.dml | 34 ++++ .../functions/transform/ZPackageSuite.java | 1 + 11 files changed, 498 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/parser/DataExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java index c33f965..96089f0 100644 --- a/src/main/java/org/apache/sysml/parser/DataExpression.java +++ b/src/main/java/org/apache/sysml/parser/DataExpression.java @@ -763,6 +763,7 @@ public class DataExpression extends DataIdentifier // if the MTD file exists, check the values specified in read statement match values in metadata MTD file if (configObject != null){ parseMetaDataFileParameters(mtdFileName, configObject, conditional); + inferredFormatType = true; } else { LOG.warn("Metadata file: " + new Path(mtdFileName) + " not provided"); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java index fc9e9ce..a53f673 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import org.apache.spark.Accumulator; import org.apache.spark.AccumulatorParam; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.broadcast.Broadcast; @@ -38,8 +39,10 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyFunction; import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyOffsetFunction; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; @@ -48,10 +51,13 @@ import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.FrameBlock.ColumnMetadata; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.transform.MVImputeAgent; +import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; import org.apache.sysml.runtime.transform.RecodeAgent; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.transform.encode.EncoderComposite; @@ -126,12 +132,19 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI fo.getSchema(), (int)fo.getNumColumns(), null); Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc()); - in.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild)) - .distinct().groupByKey() - .flatMap(new TransformEncodeGroupFunction(accMax)) - .saveAsTextFile(fometa.getFileName()); //trigger eval + JavaRDD<String> rcMaps = in + .mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild)) + .distinct().groupByKey() + .flatMap(new TransformEncodeGroupFunction(accMax)); + if( containsMVImputeEncoder(encoderBuild) ) { + MVImputeAgent mva = getMVImputeEncoder(encoderBuild); + rcMaps = rcMaps.union( + in.mapPartitionsToPair(new TransformEncodeBuild2Function(mva)) + .groupByKey().flatMap(new TransformEncodeGroup2Function(mva)) ); + } + rcMaps.saveAsTextFile(fometa.getFileName()); //trigger eval - //reuse multi-threaded reader + //consolidate meta data frame (reuse multi-threaded reader, special handling missing values) FrameReader reader = FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo); FrameBlock meta = reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), fo.getNumColumns()); meta.recomputeColumnCardinality(); //recompute num distinct items per column @@ -169,6 +182,32 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI /** * + * @param encoder + * @return + */ + private boolean containsMVImputeEncoder(Encoder encoder) { + if( encoder instanceof EncoderComposite ) + for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() ) + if( cencoder instanceof MVImputeAgent ) + return true; + return false; + } + + /** + * + * @param encoder + * @return + */ + private MVImputeAgent getMVImputeEncoder(Encoder encoder) { + if( encoder instanceof EncoderComposite ) + for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() ) + if( cencoder instanceof MVImputeAgent ) + return (MVImputeAgent) cencoder; + return null; + } + + /** + * */ public static class TransformEncodeBuildFunction implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String> { @@ -266,4 +305,111 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI return Math.max(arg0, arg1); } } + + /** + * + */ + public static class TransformEncodeBuild2Function implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata> + { + private static final long serialVersionUID = 6336375833412029279L; + + private MVImputeAgent _encoder = null; + + public TransformEncodeBuild2Function(MVImputeAgent encoder) { + _encoder = encoder; + } + + @Override + public Iterable<Tuple2<Integer, ColumnMetadata>> call(Iterator<Tuple2<Long, FrameBlock>> iter) + throws Exception + { + //build meta data (e.g., histograms and means) + while( iter.hasNext() ) { + FrameBlock block = iter.next()._2(); + _encoder.build(block); + } + + //extract meta data + ArrayList<Tuple2<Integer,ColumnMetadata>> ret = new ArrayList<Tuple2<Integer,ColumnMetadata>>(); + int[] collist = _encoder.getColList(); + for( int j=0; j<collist.length; j++ ) { + if( _encoder.getMethod(collist[j]) == MVMethod.GLOBAL_MODE ) { + HashMap<String,Long> hist = _encoder.getHistogram(collist[j]); + for( Entry<String,Long> e : hist.entrySet() ) + ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j], + new ColumnMetadata(e.getValue(), e.getKey()))); + } + else if( _encoder.getMethod(collist[j]) == MVMethod.GLOBAL_MEAN ) { + ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j], + new ColumnMetadata(_encoder.getNonMVCount(collist[j]), String.valueOf(_encoder.getMeans()[j]._sum)))); + } + else if( _encoder.getMethod(collist[j]) == MVMethod.CONSTANT ) { + ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j], + new ColumnMetadata(0, _encoder.getReplacement(collist[j])))); + } + } + + return ret; + } + } + + /** + * + */ + public static class TransformEncodeGroup2Function implements FlatMapFunction<Tuple2<Integer, Iterable<ColumnMetadata>>, String> + { + private static final long serialVersionUID = 702100641492347459L; + + private MVImputeAgent _encoder = null; + + public TransformEncodeGroup2Function(MVImputeAgent encoder) { + _encoder = encoder; + } + + @Override + public Iterable<String> call(Tuple2<Integer, Iterable<ColumnMetadata>> arg0) + throws Exception + { + int colix = arg0._1(); + Iterator<ColumnMetadata> iter = arg0._2().iterator(); + ArrayList<String> ret = new ArrayList<String>(); + + //compute global mode of categorical feature, i.e., value with highest frequency + if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MODE ) { + HashMap<String, Long> hist = new HashMap<String,Long>(); + while( iter.hasNext() ) { + ColumnMetadata cmeta = iter.next(); + Long tmp = hist.get(cmeta.getMvValue()); + hist.put(cmeta.getMvValue(), cmeta.getNumDistinct() + ((tmp!=null)?tmp:0)); + } + long max = Long.MIN_VALUE; String mode = null; + for( Entry<String, Long> e : hist.entrySet() ) + if( e.getValue() > max ) { + mode = e.getKey(); + max = e.getValue(); + } + ret.add("-2 " + colix + " " + mode); + } + //compute global mean of categorical feature + else if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MEAN ) { + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + int count = 0; + while( iter.hasNext() ) { + ColumnMetadata cmeta = iter.next(); + kplus.execute2(kbuff, Double.parseDouble(cmeta.getMvValue())); + count += cmeta.getNumDistinct(); + } + if( count > 0 ) + ret.add("-2 " + colix + " " + String.valueOf(kbuff._sum/count)); + } + //pass-through constant label + else if( _encoder.getMethod(colix) == MVMethod.CONSTANT ) { + if( iter.hasNext() ) + ret.add("-2 " + colix + " " + iter.next().getMvValue()); + } + + return ret; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 2088e85..051ce58 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -25,6 +25,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.io.Serializable; import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Arrays; @@ -1256,7 +1257,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable /** * */ - public static class ColumnMetadata { + public static class ColumnMetadata implements Serializable { + private static final long serialVersionUID = -90094082422100311L; + private long _ndistinct = 0; private String _mvValue = null; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index fe83627..ad7cbfc 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -375,8 +375,7 @@ public class BinAgent extends Encoder @Override public FrameBlock getMetaData(FrameBlock meta) { - // TODO Auto-generated method stub - return null; + return meta; } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java index 344693c..68896ac 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java @@ -30,6 +30,7 @@ import java.util.BitSet; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -67,15 +68,8 @@ public class MVImputeAgent extends Encoder public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT }; - /* - * Imputation Methods: - * 1 - global_mean - * 2 - global_mode - * 3 - constant - * - */ - private byte[] _mvMethodList = null; - private byte[] _mvscMethodList = null; // scaling methods for attributes that are imputed and also scaled + private MVMethod[] _mvMethodList = null; + private MVMethod[] _mvscMethodList = null; // scaling methods for attributes that are imputed and also scaled private BitSet _isMVScaled = null; private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE); // function object that understands variance computation @@ -86,10 +80,9 @@ public class MVImputeAgent extends Encoder private long[] _countList = null; // #of non-missing values private CM_COV_Object[] _varList = null; // column-level variances, computed so far (for scaling) - private int[] _scnomvList = null; // List of attributes that are scaled but not imputed - private byte[] _scnomvMethodList = null; // scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring + private MVMethod[] _scnomvMethodList = null; // scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring private KahanObject[] _scnomvMeanList = null; // column-level means, for attributes scaled but not imputed private long[] _scnomvCountList = null; // #of non-missing values, for attributes scaled but not imputed private CM_COV_Object[] _scnomvVarList = null; // column-level variances, computed so far @@ -97,6 +90,7 @@ public class MVImputeAgent extends Encoder private String[] _replacementList = null; // replacements: for global_mean, mean; and for global_mode, recode id of mode category private String[] _NAstrings = null; private List<Integer> _rcList = null; + private HashMap<Integer,HashMap<String,Long>> _hist = null; public String[] getReplacements() { return _replacementList; } public KahanObject[] getMeans() { return _meanList; } @@ -108,9 +102,16 @@ public class MVImputeAgent extends Encoder throws JSONException { super(null, clen); + + //handle column list int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, TfUtils.TXMETHOD_IMPUTE); initColList(collist); + //handle method list + parseMethodsAndReplacments(parsedSpec); + + //create reuse histograms + _hist = new HashMap<Integer, HashMap<String,Long>>(); } public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings, int clen) @@ -136,7 +137,7 @@ public class MVImputeAgent extends Encoder int mvLength = mvattrs.size(); _colList = new int[mvLength]; - _mvMethodList = new byte[mvLength]; + _mvMethodList = new MVMethod[mvLength]; _meanList = new KahanObject[mvLength]; _countList = new long[mvLength]; @@ -147,7 +148,7 @@ public class MVImputeAgent extends Encoder for(int i=0; i < _colList.length; i++) { _colList[i] = UtilFunctions.toInt(mvattrs.get(i)); - _mvMethodList[i] = (byte) UtilFunctions.toInt(mvmthds.get(i)); + _mvMethodList[i] = MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))]; _meanList[i] = new KahanObject(0, 0); } @@ -173,7 +174,7 @@ public class MVImputeAgent extends Encoder else { if ( _colList != null ) - _mvscMethodList = new byte[_colList.length]; + _mvscMethodList = new MVMethod[_colList.length]; JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE); JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS); @@ -195,7 +196,7 @@ public class MVImputeAgent extends Encoder if(mvidx != -1) { _isMVScaled.set(mvidx); - _mvscMethodList[mvidx] = mthd; + _mvscMethodList[mvidx] = MVMethod.values()[mthd]; _varList[mvidx] = new CM_COV_Object(); } else @@ -205,7 +206,7 @@ public class MVImputeAgent extends Encoder if(scnomv > 0) { _scnomvList = new int[scnomv]; - _scnomvMethodList = new byte[scnomv]; + _scnomvMethodList = new MVMethod[scnomv]; _scnomvMeanList = new KahanObject[scnomv]; _scnomvCountList = new long[scnomv]; @@ -219,7 +220,7 @@ public class MVImputeAgent extends Encoder if(isApplicable(colID) == -1) { // scaled but not imputed _scnomvList[idx] = colID; - _scnomvMethodList[idx] = mthd; + _scnomvMethodList[idx] = MVMethod.values()[mthd]; _scnomvMeanList[idx] = new KahanObject(0, 0); _scnomvVarList[idx] = new CM_COV_Object(); idx++; @@ -229,6 +230,28 @@ public class MVImputeAgent extends Encoder } } + /** + * + * @param parsedSpec + * @throws JSONException + */ + private void parseMethodsAndReplacments(JSONObject parsedSpec) throws JSONException { + JSONArray mvspec = (JSONArray) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE); + _mvMethodList = new MVMethod[mvspec.size()]; + _replacementList = new String[mvspec.size()]; + _meanList = new KahanObject[mvspec.size()]; + _countList = new long[mvspec.size()]; + for(int i=0; i < mvspec.size(); i++) { + JSONObject mvobj = (JSONObject)mvspec.get(i); + _mvMethodList[i] = MVMethod.valueOf(mvobj.get("method").toString().toUpperCase()); + if( _mvMethodList[i] == MVMethod.CONSTANT ) { + _replacementList[i] = mvobj.getString("value").toString(); + } + _meanList[i] = new KahanObject(0, 0); + } + } + + public void prepare(String[] words) throws IOException { try { @@ -242,13 +265,13 @@ public class MVImputeAgent extends Encoder if(!TfUtils.isNA(_NAstrings, w)) { _countList[i]++; - boolean computeMean = (_mvMethodList[i] == 1 || _isMVScaled.get(i) ); + boolean computeMean = (_mvMethodList[i] == MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) ); if(computeMean) { // global_mean double d = UtilFunctions.parseToDouble(w); _meanFn.execute2(_meanList[i], d, _countList[i]); - if (_isMVScaled.get(i) && _mvscMethodList[i] == 2) + if (_isMVScaled.get(i) && _mvscMethodList[i] == MVMethod.GLOBAL_MODE) _varFn.execute(_varList[i], d); } else { @@ -271,7 +294,7 @@ public class MVImputeAgent extends Encoder double d = UtilFunctions.parseToDouble(w); _scnomvCountList[i]++; // not required, this is always equal to total #records processed _meanFn.execute2(_scnomvMeanList[i], d, _scnomvCountList[i]); - if(_scnomvMethodList[i] == 2) + if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE) _varFn.execute(_scnomvVarList[i], d); } } catch(Exception e) { @@ -311,15 +334,15 @@ public class MVImputeAgent extends Encoder private DistinctValue prepMeanOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException { - byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); + MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); - if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) { + if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) { String suffix = null; if(scnomv) suffix = "scnomv"; - else if ( mthd ==1 && _isMVScaled.get(idx) ) + else if ( mthd == MVMethod.GLOBAL_MEAN && _isMVScaled.get(idx) ) suffix = "scmv"; // both scaled and mv imputed - else if ( mthd == 1 ) + else if ( mthd == MVMethod.GLOBAL_MEAN ) suffix = "noscmv"; else suffix = "scnomv"; @@ -341,8 +364,8 @@ public class MVImputeAgent extends Encoder } private DistinctValue prepMeanCorrectionOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException { - byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); - if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) { + MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); + if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) { sb.setLength(0); //CORRECTION_PREFIX + "_" + taskID + "_" + Double.toString(mean._correction); sb.append(CORRECTION_PREFIX); @@ -357,8 +380,8 @@ public class MVImputeAgent extends Encoder } private DistinctValue prepMeanCountOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException { - byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); - if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) { + MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); + if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) { sb.setLength(0); //s = COUNT_PREFIX + "_" + taskID + "_" + Long.toString(count); sb.append(COUNT_PREFIX); @@ -373,8 +396,8 @@ public class MVImputeAgent extends Encoder } private DistinctValue prepTotalCountOutput(int taskID, int idx, StringBuilder sb, boolean scnomv, TfUtils agents) throws CharacterCodingException { - byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); - if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) { + MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]); + if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) { sb.setLength(0); //TOTAL_COUNT_PREFIX + "_" + taskID + "_" + Long.toString(TransformationAgent._numValidRecords); sb.append(TOTAL_COUNT_PREFIX); @@ -390,8 +413,8 @@ public class MVImputeAgent extends Encoder private DistinctValue prepConstantOutput(int idx, StringBuilder sb) throws CharacterCodingException { if ( _mvMethodList == null ) return null; - byte mthd = _mvMethodList[idx]; - if ( mthd == 3 ) { + MVMethod mthd = _mvMethodList[idx]; + if ( mthd == MVMethod.CONSTANT ) { sb.setLength(0); sb.append(CONSTANT_PREFIX); sb.append("_"); @@ -402,7 +425,7 @@ public class MVImputeAgent extends Encoder } private DistinctValue prepVarOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException { - if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == 2 ) { + if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == MVMethod.GLOBAL_MODE ) { sb.setLength(0); sb.append(VARIANCE_PREFIX); sb.append("_"); @@ -560,7 +583,7 @@ public class MVImputeAgent extends Encoder double imputedValue = Double.NaN; KahanObject gmean = null; - if ( _mvMethodList[i] == 1 ) + if ( _mvMethodList[i] == MVMethod.GLOBAL_MEAN ) { gmean = _meanList[i]; imputedValue = _meanList[i]._sum; @@ -568,7 +591,7 @@ public class MVImputeAgent extends Encoder double mean = ( _countList[i] == 0 ? 0.0 : _meanList[i]._sum); writeTfMtd(colID, Double.toString(mean), outputDir, fs, agents); } - else if ( _mvMethodList[i] == 3 ) + else if ( _mvMethodList[i] == MVMethod.CONSTANT ) { writeTfMtd(colID, _replacementList[i], outputDir, fs, agents); @@ -584,7 +607,7 @@ public class MVImputeAgent extends Encoder if ( _isMVScaled.get(i) ) { double sdev = -1.0; - if ( _mvscMethodList[i] == 2 ) { + if ( _mvscMethodList[i] == MVMethod.GLOBAL_MODE ) { // Adjust variance with missing values long totalMissingCount = (agents.getValid() - _countList[i]); _varFn.execute(_varList[i], imputedValue, totalMissingCount); @@ -601,7 +624,7 @@ public class MVImputeAgent extends Encoder int colID = _scnomvList[i]; double mean = (_scnomvCountList[i] == 0 ? 0.0 : _scnomvMeanList[i]._sum); double sdev = -1.0; - if ( _scnomvMethodList[i] == 2 ) + if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MODE ) { double var = _scnomvVarList[i].getRequiredResult(new CMOperator(_varFn, AggregateOperationTypes.VARIANCE)); sdev = Math.sqrt(var); @@ -788,7 +811,7 @@ public class MVImputeAgent extends Encoder // since missing values themselves are replaced with gmean. long totalMissingCount = (totalRecordCount-totalValidCount); int idx = isApplicable(colID); - if(idx != -1 && _mvMethodList[idx] == 3) + if(idx != -1 && _mvMethodList[idx] == MVMethod.CONSTANT) _meanFn.execute(gmean, UtilFunctions.parseToDouble(_replacementList[idx]), totalRecordCount); _varFn.execute(gcm, gmean._sum, totalMissingCount); } @@ -863,10 +886,10 @@ public class MVImputeAgent extends Encoder for(int i=0; i<_colList.length;i++) { int colID = _colList[i]; - if ( _mvMethodList[i] == 1 || _mvMethodList[i] == 2 ) + if ( _mvMethodList[i] == MVMethod.GLOBAL_MEAN || _mvMethodList[i] == MVMethod.GLOBAL_MODE ) // global_mean or global_mode _replacementList[i] = readReplacement(colID, fs, tfMtdDir, agents); - else if ( _mvMethodList[i] == 3 ) { + else if ( _mvMethodList[i] == MVMethod.CONSTANT ) { // constant: replace a missing value by a given constant // nothing to do. The constant values are loaded already during configure } @@ -894,15 +917,8 @@ public class MVImputeAgent extends Encoder int idx = isApplicable(colID); if(idx == -1) return MVMethod.INVALID; - - switch(_mvMethodList[idx]) - { - case 1: return MVMethod.GLOBAL_MEAN; - case 2: return MVMethod.GLOBAL_MODE; - case 3: return MVMethod.CONSTANT; - default: return MVMethod.INVALID; - } - + else + return _mvMethodList[idx]; } public long getNonMVCount(int colID) { @@ -917,14 +933,48 @@ public class MVImputeAgent extends Encoder @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - // TODO Auto-generated method stub - return null; + build(in); + return apply(in, out); } @Override public void build(FrameBlock in) { - // TODO Auto-generated method stub - + try { + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) { + //compute global column mean (scale) + long off = _countList[j]; + for( int i=0; i<in.getNumRows(); i++ ) + _meanFn.execute2(_meanList[j], UtilFunctions.objectToDouble( + in.getSchema().get(colID-1), in.get(i, colID-1)), off+i+1); + _replacementList[j] = String.valueOf(_meanList[j]._sum); + _countList[j] += in.getNumRows(); + } + else if( _mvMethodList[j] == MVMethod.GLOBAL_MODE ) { + //compute global column mode (categorical), i.e., most frequent category + HashMap<String,Long> hist = _hist.containsKey(colID) ? + _hist.get(colID) : new HashMap<String,Long>(); + for( int i=0; i<in.getNumRows(); i++ ) { + String key = String.valueOf(in.get(i, colID-1)); + if( key != null && !key.isEmpty() ) { + Long val = hist.get(key); + hist.put(key, (val!=null) ? val+1 : 1); + } + } + _hist.put(colID, hist); + long max = Long.MIN_VALUE; + for( Entry<String, Long> e : hist.entrySet() ) + if( e.getValue() > max ) { + _replacementList[j] = e.getKey(); + max = e.getValue(); + } + } + } + } + catch(Exception ex) { + throw new RuntimeException(ex); + } } @Override @@ -938,7 +988,7 @@ public class MVImputeAgent extends Encoder w = words[colID-1] = _replacementList[i]; if ( _isMVScaled.get(i) ) - if ( _mvscMethodList[i] == 1 ) + if ( _mvscMethodList[i] == MVMethod.GLOBAL_MEAN ) words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum ); else words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum ); @@ -948,7 +998,7 @@ public class MVImputeAgent extends Encoder for(int i=0; i < _scnomvList.length; i++) { int colID = _scnomvList[i]; - if ( _scnomvMethodList[i] == 1 ) + if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN ) words[colID-1] = Double.toString( UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum ); else words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / _scnomvVarList[i].mean._sum ); @@ -971,8 +1021,11 @@ public class MVImputeAgent extends Encoder @Override public FrameBlock getMetaData(FrameBlock out) { - // TODO Auto-generated method stub - return null; + for( int j=0; j<_colList.length; j++ ) { + out.getColumnMetadata(_colList[j]-1) + .setMvValue(_replacementList[j]); + } + return out; } /** @@ -983,14 +1036,13 @@ public class MVImputeAgent extends Encoder public void initMetaData(FrameBlock meta) { //init replacement lists, replace recoded values to //apply mv imputation potentially after recoding - _replacementList = new String[_colList.length]; for( int j=0; j<_colList.length; j++ ) { - int colID = _colList[j]; + int colID = _colList[j]; String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); if( _rcList.contains(colID) ) { Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal); - if( mvVal2 == null) - throw new RuntimeException("Missing recode value for impute value '"+mvVal+"'."); + if( mvVal2 == null) + throw new RuntimeException("Missing recode value for impute value '"+mvVal+"' (colID="+colID+")."); _replacementList[j] = mvVal2.toString(); } else { @@ -1006,4 +1058,14 @@ public class MVImputeAgent extends Encoder public void initRecodeIDList(List<Integer> rcList) { _rcList = rcList; } + + /** + * Exposes the internal histogram after build. + * + * @param colID + * @return + */ + public HashMap<String,Long> getHistogram( int colID ) { + return _hist.get(colID); + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java index 01d7c85..5abe9db 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java @@ -412,7 +412,7 @@ public class RecodeAgent extends Encoder //probe and build column map HashMap<String,Long> map = _rcdMaps.get(colID); String key = row[colID-1]; - if( !map.containsKey(key) ) + if( key!=null && !key.isEmpty() && !map.containsKey(key) ) map.put(key, new Long(map.size()+1)); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java index bafa655..d6bf9d4 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java @@ -78,10 +78,10 @@ public class EncoderComposite extends Encoder //propagate meta data _meta = new FrameBlock(in.getNumColumns(), ValueType.STRING); - for( Encoder encoder : _encoders ) { - encoder.initMetaData(_meta); + for( Encoder encoder : _encoders ) _meta = encoder.getMetaData(_meta); - } + for( Encoder encoder : _encoders ) + encoder.initMetaData(_meta); //apply meta data for( Encoder encoder : _encoders ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 5e6e5b7..6bce4ff 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -369,7 +369,7 @@ public class UtilFunctions public static double objectToDouble(ValueType vt, Object in) { if( in == null ) return 0; switch( vt ) { - case STRING: return Double.parseDouble((String)in); + case STRING: return !((String)in).isEmpty() ? Double.parseDouble((String)in) : 0; case BOOLEAN: return ((Boolean)in)?1d:0d; case INT: return (Long)in; case DOUBLE: return (Double)in; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java new file mode 100644 index 0000000..b61060b --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.transform; + +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.io.MatrixReaderFactory; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class TransformFrameEncodeApplyTest extends AutomatedTestBase +{ + private final static String TEST_NAME1 = "TransformFrameEncodeApply"; + private final static String TEST_DIR = "functions/transform/"; + private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameEncodeApplyTest.class.getSimpleName() + "/"; + + //dataset and transform tasks without missing values + private final static String DATASET1 = "homes3/homes.csv"; + private final static String SPEC1 = "homes3/homes.tfspec_recode.json"; + private final static String SPEC2 = "homes3/homes.tfspec_dummy.json"; + private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; //incl recode + + //dataset and transform tasks with missing values + private final static String DATASET2 = "homes/homes.csv"; + private final static String SPEC4 = "homes3/homes.tfspec_impute.json"; + private final static String SPEC5 = "homes3/homes.tfspec_omit.json"; + + public enum TransformType { + RECODE, + DUMMY, + BIN, + IMPUTE, + OMIT, + } + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "y" }) ); + } + + @Test + public void testHomesRecodeSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.RECODE); + } + + @Test + public void testHomesRecodeSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.RECODE); + } + + @Test + public void testHomesDummycodeSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.DUMMY); + } + + @Test + public void testHomesDummycodeSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.DUMMY); + } + + @Test + public void testHomesBinningSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.BIN); + } + + @Test + public void testHomesBinningSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.BIN); + } + + @Test + public void testHomesOmitSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.OMIT); + } + + @Test + public void testHomesOmitSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.OMIT); + } + + @Test + public void testHomesImputeSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.IMPUTE); + } + + @Test + public void testHomesImputeSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.IMPUTE); + } + + /** + * + * @param rt + * @param ofmt + * @param dataset + */ + private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt, TransformType type ) + { + //set runtime platform + RUNTIME_PLATFORM rtold = rtplatform; + boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; + rtplatform = rt; + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + //set transform specification + String SPEC = null; String DATASET = null; + switch( type ) { + case RECODE: SPEC = SPEC1; DATASET = DATASET1; break; + case DUMMY: SPEC = SPEC2; DATASET = DATASET1; break; + case BIN: SPEC = SPEC3; DATASET = DATASET1; break; + case IMPUTE: SPEC = SPEC4; DATASET = DATASET2; break; + case OMIT: SPEC = SPEC5; DATASET = DATASET2; break; + } + + if( !ofmt.equals("csv") ) + throw new RuntimeException("Unsupported test output format"); + + try + { + getAndLoadTestConfiguration(TEST_NAME1); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-explain","-nvargs", + "DATA=" + HOME + "input/" + DATASET, + "TFSPEC=" + HOME + "input/" + SPEC, + "TFDATA1=" + output("tfout1"), + "TFDATA2=" + output("tfout2"), + "OFMT=" + ofmt }; + + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; + runTest(true, false, null, -1); + + //read input/output and compare + double[][] R1 = DataConverter.convertToDoubleMatrix(MatrixReaderFactory + .createMatrixReader(InputInfo.CSVInputInfo) + .readMatrixFromHDFS(output("tfout1"), -1L, -1L, 1000, 1000, -1)); + double[][] R2 = DataConverter.convertToDoubleMatrix(MatrixReaderFactory + .createMatrixReader(InputInfo.CSVInputInfo) + .readMatrixFromHDFS(output("tfout2"), -1L, -1L, 1000, 1000, -1)); + TestUtils.compareMatrices(R1, R2, R1.length, R1[0].length, 0); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { + rtplatform = rtold; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml new file mode 100644 index 0000000..08c98d0 --- /dev/null +++ b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +F1 = read($DATA, data_type="frame", format="csv"); + +jspec = read($TFSPEC, data_type="scalar", value_type="string"); + +[X, M] = transformencode(target=F1, spec=jspec); + +if(1==1){} + +X2 = transformapply(target=F1, spec=jspec, meta=M); + +write(X, $TFDATA1, format=$OFMT); +write(X2, $TFDATA2, format=$OFMT); + http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java index 5122a60..bdcc36b 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java @@ -31,6 +31,7 @@ import org.junit.runners.Suite; TransformAndApplyTest.class, TransformEncodeDecodeTest.class, TransformFrameApplyTest.class, + TransformFrameEncodeApplyTest.class, TransformFrameEncodeDecodeTest.class, TransformReadMetaTest.class, TransformTest.class,
