[SYSTEMML-613] Extended transform encoder api and cleanup (dep spark) This patch generalized the transform encoder api in order to support transformencode over frames as well. Furthermore it cleans up the existing agents (recode, dummy code etc) in order to isolate scala dependencies to the spark backend.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/17b52217 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/17b52217 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/17b52217 Branch: refs/heads/master Commit: 17b52217cd05c56efdd96580c44700ee28a5fa9c Parents: 8e8283b Author: Matthias Boehm <[email protected]> Authored: Thu Apr 7 13:29:13 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Apr 7 13:30:30 2016 -0700 ---------------------------------------------------------------------- .../sysml/runtime/transform/BinAgent.java | 44 ++++++-- .../sysml/runtime/transform/DummycodeAgent.java | 32 ++++++ .../sysml/runtime/transform/GenTfMtdSPARK.java | 20 +++- .../sysml/runtime/transform/MVImputeAgent.java | 37 +++++- .../sysml/runtime/transform/OmitAgent.java | 27 +++++ .../sysml/runtime/transform/RecodeAgent.java | 102 ++++++++++++++++- .../sysml/runtime/transform/encode/Encoder.java | 43 +++++++ .../transform/encode/EncoderComposite.java | 113 +++++++++++++++++++ .../transform/encode/EncoderFactory.java | 77 +++++++++++++ 9 files changed, 474 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index 4f24afd..35bfd5a 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -37,9 +37,9 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; @@ -152,7 +152,7 @@ public class BinAgent extends Encoder } } - public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { + public ArrayList<Pair<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException { if ( !isApplicable() ) return list; @@ -161,9 +161,9 @@ public class BinAgent extends Encoder int colID = _colList[i]; Integer iw = -colID; - list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) ); - list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) ); - list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) ); + list.add( new Pair<Integer,DistinctValue>(iw, prepMinOutput(i)) ); + list.add( new Pair<Integer,DistinctValue>(iw, prepMaxOutput(i)) ); + list.add( new Pair<Integer,DistinctValue>(iw, prepNBinsOutput(i)) ); } } catch(Exception e) { throw new IOException(e); @@ -317,4 +317,34 @@ public class BinAgent extends Encoder return words; } + + @Override + public double[] encode(String[] in, double[] out) { + // TODO Auto-generated method stub + return null; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void build(String[] in) { + // TODO Auto-generated method stub + + } + + @Override + public void build(FrameBlock in) { + // TODO Auto-generated method stub + + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java index 9ceb368..cc51a51 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java @@ -41,6 +41,8 @@ import org.apache.wink.json4j.JSONObject; import com.google.common.base.Functions; import com.google.common.collect.Ordering; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; @@ -393,4 +395,34 @@ public class DummycodeAgent extends Encoder return nwords; } + + @Override + public double[] encode(String[] in, double[] out) { + // TODO Auto-generated method stub + return null; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void build(String[] in) { + // TODO Auto-generated method stub + + } + + @Override + public void build(FrameBlock in) { + // TODO Auto-generated method stub + + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java index e6bb08c..a56f6ef 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,6 +43,7 @@ import scala.Tuple2; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; +import org.apache.sysml.runtime.matrix.data.Pair; public class GenTfMtdSPARK { @@ -129,17 +131,17 @@ public class GenTfMtdSPARK // Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated. - ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>(); + ArrayList<Pair<Integer,DistinctValue>> outList = new ArrayList<Pair<Integer,DistinctValue>>(); _agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); _agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); _agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal())); - Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); + Pair<Integer, DistinctValue> tuple = new Pair<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); outList.add(tuple); - return outList.iterator(); + return toTuple2List(outList).iterator(); } } @@ -211,4 +213,16 @@ public class GenTfMtdSPARK return numRows; } } + + /** + * + * @param in + * @return + */ + public static List<Tuple2<Integer,DistinctValue>> toTuple2List(List<Pair<Integer,DistinctValue>> in) { + ArrayList<Tuple2<Integer,DistinctValue>> ret = new ArrayList<Tuple2<Integer,DistinctValue>>(); + for( Pair<Integer,DistinctValue> e : in ) + ret.add(new Tuple2<Integer,DistinctValue>(e.getKey(), e.getValue())); + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java index 5cadef6..76419e6 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java @@ -38,15 +38,15 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.functionobjects.CM; import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.functionobjects.Mean; import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; import org.apache.sysml.runtime.transform.encode.Encoder; @@ -468,12 +468,12 @@ public class MVImputeAgent extends Encoder * @throws IOException */ - private void addDV(Integer iw, DistinctValue dv, ArrayList<Tuple2<Integer, DistinctValue>> list) throws IOException { + private void addDV(Integer iw, DistinctValue dv, ArrayList<Pair<Integer, DistinctValue>> list) throws IOException { if ( dv != null ) - list.add( new Tuple2<Integer, DistinctValue>(iw, dv) ); + list.add( new Pair<Integer, DistinctValue>(iw, dv) ); } - public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { + public ArrayList<Pair<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException { try { StringBuilder sb = new StringBuilder(); DistinctValue dv = null; @@ -939,4 +939,29 @@ public class MVImputeAgent extends Encoder int idx = isApplicable(colID); return (idx == -1) ? null : _replacementList[idx]; } + @Override + public double[] encode(String[] in, double[] out) { + // TODO Auto-generated method stub + return null; + } + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + // TODO Auto-generated method stub + return null; + } + @Override + public void build(String[] in) { + // TODO Auto-generated method stub + + } + @Override + public void build(FrameBlock in) { + // TODO Auto-generated method stub + + } + @Override + public FrameBlock getMetaData(FrameBlock out) { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java index c169129..a4eb715 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java @@ -30,6 +30,8 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; @@ -89,5 +91,30 @@ public class OmitAgent extends Encoder public String[] apply(String[] words) { return null; } + + @Override + public double[] encode(String[] in, double[] out) { + return null; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + return null; + } + + @Override + public void build(String[] in) { + + } + + @Override + public void build(FrameBlock in) { + + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java index 1f9749d..49961a2 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,12 +39,11 @@ import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; -import scala.Tuple2; - import com.google.common.collect.Ordering; import org.apache.sysml.lops.Lop; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; import org.apache.sysml.runtime.transform.decode.DecoderRecode; @@ -182,12 +182,12 @@ public class RecodeAgent extends Encoder mapOutputHelper(taskID, out, null, agents); } - public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { + public ArrayList<Pair<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException { mapOutputHelper(taskID, null, list, agents); return list; } - public void mapOutputHelper(int taskID, OutputCollector<IntWritable, DistinctValue> out, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { + public void mapOutputHelper(int taskID, OutputCollector<IntWritable, DistinctValue> out, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException { if ( _colList == null && _mvrcdList == null ) return; @@ -209,7 +209,7 @@ public class RecodeAgent extends Encoder } else if ( list != null ) { for(String s : map.keySet()) - list.add(new Tuple2<Integer,DistinctValue>(colID, new DistinctValue(s, map.get(s))) ); + list.add(new Pair<Integer,DistinctValue>(colID, new DistinctValue(s, map.get(s))) ); } } } @@ -441,5 +441,97 @@ public class RecodeAgent extends Encoder return (tmp!=null) ? Long.toString(tmp) : null; } } + + @Override + public double[] encode(String[] in, double[] out) { + if( !isApplicable() ) + return out; + + //build and apply recode maps + build(in); + apply(in); + + //convert to double + for( int j=0; j<_colList.length; j++ ) + out[_colList[j]-1] = Double.parseDouble(in[_colList[j]-1]); + return out; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + if( !isApplicable() ) + return out; + + //build recode maps + build(in); + + //apply created recode maps + Iterator<String[]> iter = in.getStringRowIterator(); + for( int i=0; iter.hasNext(); i++ ) { + String[] row = apply( iter.next() ); + for( int j=0; j<_colList.length; j++ ) { + double val = Double.parseDouble(row[_colList[j]-1]); + out.quickSetValue(i, _colList[j]-1, val); + } + } + + return out; + } + + @Override + public void build(String[] in) { + if( !isApplicable() ) + return; + + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + //allocate column map if necessary + if( !_rcdMaps.containsKey(colID) ) + _rcdMaps.put(colID, new HashMap<String,Long>()); + //probe and build column map + HashMap<String,Long> map = _rcdMaps.get(colID); + String key = in[colID-1]; + if( !map.containsKey(key) ) + map.put(key, new Long(map.size()+1)); + } + } + + @Override + public void build(FrameBlock in) { + if( !isApplicable() ) + return; + + Iterator<String[]> iter = in.getStringRowIterator(); + while( iter.hasNext() ) + build( iter.next() ); + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + if( !isApplicable() ) + return out; + + //inverse operation to initRecodeMaps + + //allocate output rows + int maxDistinct = 0; + for( int j=0; j<_colList.length; j++ ) + if( _rcdMaps.containsKey(_colList[j]) ) + maxDistinct = Math.max(maxDistinct, _rcdMaps.get(_colList[j]).size()); + out.ensureAllocatedColumns(maxDistinct); + + //create compact meta data representation + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + int rowID = 0; + if( _rcdMaps.containsKey(_colList[j]) ) + for( Entry<String, Long> e : _rcdMaps.get(colID).entrySet() ) { + String tmp = e.getKey() + Lop.DATATYPE_PREFIX + e.getValue().toString(); + out.set(rowID++, colID-1, tmp); + } + } + + return out; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java index 32694c9..481832b 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java @@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.transform.DistinctValue; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -87,6 +89,47 @@ public abstract class Encoder implements Serializable } /** + * Row encode: build and apply (transform encode). + * + * @param in + * @param out + * @return + */ + public abstract double[] encode(String[] in, double[] out); + + /** + * Block encode: build and apply (transform encode). + * + * @param in + * @param out + * @return + */ + public abstract MatrixBlock encode(FrameBlock in, MatrixBlock out); + + /** + * Build the transform meta data for given row input. This call modifies + * and keeps meta data as encoder state. + * + * @param in + */ + public abstract void build(String[] in); + + /** + * Build the transform meta data for the given block input. This call modifies + * and keeps meta data as encoder state. + * + * @param in + */ + public abstract void build(FrameBlock in); + + /** + * Construct a frame block out of the transform meta data. + * + * @return + */ + public abstract FrameBlock getMetaData(FrameBlock out); + + /** * Encode input data according to existing transform meta * data (transform apply). * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java new file mode 100644 index 0000000..3dc11df --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.transform.DistinctValue; +import org.apache.sysml.runtime.transform.TfUtils; + +/** + * Simple composite encoder that applies a list of encoders + * in specified order. By implementing the default encoder API + * it can be used as a drop-in replacement for any other encoder. + * + */ +public class EncoderComposite extends Encoder +{ + private static final long serialVersionUID = -8473768154646831882L; + + private List<Encoder> _encoders = null; + + protected EncoderComposite(List<Encoder> encoders) { + super(null); + _encoders = encoders; + } + + protected EncoderComposite(Encoder[] encoders) { + super(null); + _encoders = Arrays.asList(encoders); + } + + @Override + public double[] encode(String[] in, double[] out) { + for( Encoder encoder : _encoders ) + encoder.encode(in, out); + return out; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + for( Encoder encoder : _encoders ) + encoder.encode(in, out); + return out; + } + + @Override + public void build(String[] in) { + for( Encoder encoder : _encoders ) + encoder.build(in); + } + + @Override + public void build(FrameBlock in) { + for( Encoder encoder : _encoders ) + encoder.build(in); + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + for( Encoder encoder : _encoders ) + encoder.getMetaData(out); + return out; + } + + @Override + public String[] apply(String[] in) { + for( Encoder encoder : _encoders ) + encoder.apply(in); + return in; + } + + @Override + public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { + throw new RuntimeException("File-based api not supported."); + } + + @Override + public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { + throw new RuntimeException("File-based api not supported."); + } + + @Override + public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { + throw new RuntimeException("File-based api not supported."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java new file mode 100644 index 0000000..cb5b02e --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.transform.RecodeAgent; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.wink.json4j.JSONObject; + +public class EncoderFactory +{ + /** + * + * @param spec + * @param clen + * @return + * @throws DMLRuntimeException + */ + public static Encoder createEncoder(String spec, int clen) throws DMLRuntimeException { + return createEncoder(spec, Collections.nCopies(clen, ValueType.STRING)); + } + + + /** + * + * @param spec + * @param schema + * @return + * @throws DMLRuntimeException + */ + public static Encoder createEncoder(String spec, List<ValueType> schema) + throws DMLRuntimeException + { + Encoder encoder = null; + + try { + //parse transform specification + JSONObject jSpec = new JSONObject(spec); + List<Encoder> lencoders = new ArrayList<Encoder>(); + + //create encoder 'recode' + if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { + lencoders.add(new RecodeAgent(jSpec)); + } + + //create composite decoder of all created decoders + encoder = new EncoderComposite(lencoders); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + return encoder; + } +}
