Updated Branches: refs/heads/apache-crunch-0.8 ffe89ea3f -> 54cef7b17
CRUNCH-239: Add a Union PType. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/54cef7b1 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/54cef7b1 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/54cef7b1 Branch: refs/heads/apache-crunch-0.8 Commit: 54cef7b17ff9a8e685578067a0992efb975fe67a Parents: ffe89ea Author: Josh Wills <[email protected]> Authored: Tue Jan 7 17:28:48 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Jan 9 07:41:29 2014 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/Union.java | 65 +++++++++ .../java/org/apache/crunch/lib/Cogroup.java | 45 +++--- .../org/apache/crunch/types/PTypeFamily.java | 3 + .../apache/crunch/types/UnionDeepCopier.java | 49 +++++++ .../crunch/types/avro/AvroTypeFamily.java | 6 + .../org/apache/crunch/types/avro/Avros.java | 138 +++++++++++++++++++ .../crunch/types/writable/UnionWritable.java | 72 ++++++++++ .../types/writable/WritableTypeFamily.java | 6 + .../apache/crunch/types/writable/Writables.java | 103 +++++++++++++- 9 files changed, 459 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/Union.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Union.java b/crunch-core/src/main/java/org/apache/crunch/Union.java new file mode 100644 index 0000000..6db1657 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/Union.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +/** + * Allows us to represent the combination of multiple data sources that may contain different types of data + * as a single type with an index to indicate which of the original sources the current record was from. + */ +public class Union { + + private final int index; + private final Object value; + + public Union(int index, Object value) { + this.index = index; + this.value = value; + } + + /** + * Returns the index of the original data source for this union type. + */ + public int getIndex() { + return index; + } + + /** + * Returns the underlying object value of the record. + */ + public Object getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Union that = (Union) o; + + if (index != that.index) return false; + if (value != null ? !value.equals(that.value) : that.value != null) return false; + + return true; + } + + @Override + public int hashCode() { + return 31 * index + (value != null ? value.hashCode() : 0); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java index 9efcb5e..8743a29 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -27,6 +27,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.TupleFactory; @@ -211,20 +212,18 @@ public class Cogroup { for (int i = 0; i < rest.length; i++) { ptypes[i + 1] = rest[i].getValueType(); } - PType<TupleN> itype = ptf.tuples(ptypes); + PType<Union> itype = ptf.unionOf(ptypes); - PTable<K, TupleN> firstInter = first.mapValues("coGroupTag1", - new CogroupFn(0, 1 + rest.length), - itype); - PTable<K, TupleN>[] inter = new PTable[rest.length]; + PTable<K, Union> firstInter = first.mapValues("coGroupTag1", + new CogroupFn(0), itype); + PTable<K, Union>[] inter = new PTable[rest.length]; for (int i = 0; i < rest.length; i++) { inter[i] = rest[i].mapValues("coGroupTag" + (i + 2), - new CogroupFn(i + 1, 1 + rest.length), - itype); + new CogroupFn(i + 1), itype); } - PTable<K, TupleN> union = firstInter.union(inter); - PGroupedTable<K, TupleN> grouped; + PTable<K, Union> union = firstInter.union(inter); + PGroupedTable<K, Union> grouped; if (numReducers > 0) { grouped = union.groupByKey(numReducers); } else { @@ -236,25 +235,21 @@ public class Cogroup { outputType); } - private static class CogroupFn<T> extends MapFn<T, TupleN> { + private static class CogroupFn<T> extends MapFn<T, Union> { private final int index; - private final int size; - - CogroupFn(int index, int size) { + + CogroupFn(int index) { this.index = index; - this.size = size; } @Override - public TupleN map(T input) { - Object[] v = new Object[size]; - v[index] = input; - return TupleN.of(v); + public Union map(T input) { + return new Union(index, input); } } private static class PostGroupFn<T extends Tuple> extends - MapFn<Iterable<TupleN>, T> { + MapFn<Iterable<Union>, T> { private final TupleFactory factory; private final PType[] ptypes; @@ -273,18 +268,14 @@ public class Cogroup { } @Override - public T map(Iterable<TupleN> input) { + public T map(Iterable<Union> input) { Collection[] collections = new Collection[ptypes.length]; for (int i = 0; i < ptypes.length; i++) { collections[i] = Lists.newArrayList(); } - for (TupleN t : input) { - for (int i = 0; i < ptypes.length; i++) { - if (t.get(i) != null) { - collections[i].add(ptypes[i].getDetachedValue(t.get(i))); - break; - } - } + for (Union t : input) { + int index = t.getIndex(); + collections[index].add(ptypes[index].getDetachedValue(t.getValue())); } return (T) factory.makeTuple(collections); } http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java index 9458f14..0ad324a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java @@ -27,6 +27,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; /** * An abstract factory for creating {@code PType} instances that have the same @@ -68,6 +69,8 @@ public interface PTypeFamily { <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base); + PType<Union> unionOf(PType<?>... ptypes); + <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value); /** http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java new file mode 100644 index 0000000..ba712e0 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import com.google.common.collect.Lists; +import org.apache.crunch.Union; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +public class UnionDeepCopier implements DeepCopier<Union> { + private final List<PType> elementTypes; + + public UnionDeepCopier(PType... elementTypes) { + this.elementTypes = Lists.newArrayList(elementTypes); + } + + @Override + public void initialize(Configuration conf) { + for (PType elementType : elementTypes) { + elementType.initialize(conf); + } + } + + @Override + public Union deepCopy(Union source) { + if (source == null) { + return null; + } + int index = source.getIndex(); + Object copy = elementTypes.get(index).getDetachedValue(source.getValue()); + return new Union(index, copy); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java index e09e173..ba8add6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java @@ -29,6 +29,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -161,4 +162,9 @@ public class AvroTypeFamily implements PTypeFamily { public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) { return Avros.derived(clazz, inputFn, outputFn, base); } + + @Override + public PType<Union> unionOf(PType<?>... ptypes) { + return Avros.unionOf(ptypes); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 2cf63e8..8f1dae0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -48,6 +48,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.types.CollectionDeepCopier; @@ -58,6 +59,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypes; import org.apache.crunch.types.TupleDeepCopier; import org.apache.crunch.types.TupleFactory; +import org.apache.crunch.types.UnionDeepCopier; import org.apache.crunch.types.writable.WritableDeepCopier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -649,6 +651,142 @@ public class Avros { ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes); } + private static class UnionRecordToTuple extends MapFn<GenericRecord, Union> { + private final List<MapFn> fns; + + public UnionRecordToTuple(PType<?>... ptypes) { + this.fns = Lists.newArrayList(); + for (PType<?> ptype : ptypes) { + AvroType atype = (AvroType) ptype; + fns.add(atype.getInputMapFn()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.initialize(); + } + } + + @Override + public Union map(GenericRecord input) { + int index = (Integer) input.get(0); + return new Union(index, fns.get(index).map(input.get(1))); + } + } + + private static class TupleToUnionRecord extends MapFn<Union, GenericRecord> { + private final List<MapFn> fns; + private final List<AvroType> avroTypes; + private final String jsonSchema; + private final boolean isReflect; + private transient Schema schema; + + public TupleToUnionRecord(Schema schema, PType<?>... ptypes) { + this.fns = Lists.newArrayList(); + this.avroTypes = Lists.newArrayList(); + this.jsonSchema = schema.toString(); + boolean reflectFound = false; + boolean specificFound = false; + for (PType ptype : ptypes) { + AvroType atype = (AvroType) ptype; + fns.add(atype.getOutputMapFn()); + avroTypes.add(atype); + if (atype.hasReflect()) { + reflectFound = true; + } + if (atype.hasSpecific()) { + specificFound = true; + } + } + if (specificFound && reflectFound) { + checkCombiningSpecificAndReflectionSchemas(); + } + this.isReflect = reflectFound; + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(getContext()); + } + } + + @Override + public void initialize() { + this.schema = new Schema.Parser().parse(jsonSchema); + for (MapFn fn : fns) { + fn.initialize(); + } + } + + private GenericRecord createRecord() { + if (isReflect) { + return new ReflectGenericRecord(schema); + } else { + return new GenericData.Record(schema); + } + } + + @Override + public GenericRecord map(Union input) { + GenericRecord record = createRecord(); + int index = input.getIndex(); + record.put(0, index); + record.put(1, fns.get(index).map(input.getValue())); + return record; + } + } + + public static PType<Union> unionOf(PType<?>... ptypes) { + List<Schema> schemas = Lists.newArrayList(); + MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + for (int i = 0; i < ptypes.length; i++) { + AvroType atype = (AvroType) ptypes[i]; + Schema schema = atype.getSchema(); + if (!schemas.contains(schema)) { + schemas.add(schema); + md.update(schema.toString().getBytes(Charsets.UTF_8)); + } + } + List<Schema.Field> fields = Lists.newArrayList( + new Schema.Field("index", Schema.create(Type.INT), "", null), + new Schema.Field("value", Schema.createUnion(schemas), "", null)); + + String schemaName = "union" + Base64.encodeBase64URLSafeString(md.digest()).replace('-', 'x'); + Schema schema = Schema.createRecord(schemaName, "", "crunch", false); + schema.setFields(fields); + return new AvroType<Union>(Union.class, schema, new UnionRecordToTuple(ptypes), + new TupleToUnionRecord(schema, ptypes), new UnionDeepCopier(ptypes), null, ptypes); + } + private static Schema createTupleSchema(PType<?>... ptypes) throws RuntimeException { // Guarantee each tuple schema has a globally unique name List<Schema.Field> fields = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java new file mode 100644 index 0000000..b88632a --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.writable; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class UnionWritable implements WritableComparable<UnionWritable> { + + private int index; + private BytesWritable value; + + public UnionWritable() { + // no-arg constructor for writables + } + + public UnionWritable(int index, BytesWritable value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return index; + } + + public BytesWritable getValue() { + return value; + } + + @Override + public int compareTo(UnionWritable other) { + if (index == other.getIndex()) { + return value.compareTo(other.getValue()); + } + return index - other.getIndex(); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, index); + value.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.index = WritableUtils.readVInt(in); + if (value == null) { + value = new BytesWritable(); + } + value.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java index a94db96..5754b4d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java @@ -27,6 +27,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -144,4 +145,9 @@ public class WritableTypeFamily implements PTypeFamily { public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) { return Writables.derived(clazz, inputFn, outputFn, base); } + + @Override + public PType<Union> unionOf(PType<?>... ptypes) { + return Writables.unionOf(ptypes); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/54cef7b1/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java index 0273e5e..d8ad6ca 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -32,6 +32,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.types.PType; @@ -288,7 +289,7 @@ public class Writables { } return instance; } - + /** * For mapping from {@link TupleWritable} instances to {@link Tuple}s. * @@ -455,6 +456,106 @@ public class Writables { return new WritableType(clazz, TupleWritable.class, input, output, ptypes); } + /** + * For mapping from {@link TupleWritable} instances to {@link Tuple}s. + * + */ + private static class UWInputFn extends MapFn<UnionWritable, Union> { + private final List<MapFn> fns; + private final List<Class<Writable>> writableClasses; + + public UWInputFn(WritableType<?, ?>... ptypes) { + this.fns = Lists.newArrayList(); + this.writableClasses = Lists.newArrayList(); + for (WritableType ptype : ptypes) { + fns.add(ptype.getInputMapFn()); + writableClasses.add(ptype.getSerializationClass()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.initialize(); + } + } + + @Override + public Union map(UnionWritable in) { + int index = in.getIndex(); + Writable w = create(writableClasses.get(index), in.getValue()); + return new Union(index, fns.get(index).map(w)); + } + } + + /** + * For mapping from {@code Tuple}s to {@code TupleWritable}s. + * + */ + private static class UWOutputFn extends MapFn<Union, UnionWritable> { + + private final List<MapFn> fns; + + public UWOutputFn(PType<?>... ptypes) { + this.fns = Lists.newArrayList(); + for (PType<?> ptype : ptypes) { + fns.add(ptype.getOutputMapFn()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.initialize(); + } + } + + @Override + public UnionWritable map(Union input) { + int index = input.getIndex(); + Writable w = (Writable) fns.get(index).map(input.getValue()); + return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w))); + } + } + + public static PType<Union> unionOf(PType<?>... ptypes) { + WritableType[] wt = new WritableType[ptypes.length]; + for (int i = 0; i < wt.length; i++) { + wt[i] = (WritableType) ptypes[i]; + } + UWInputFn input= new UWInputFn(wt); + UWOutputFn output = new UWOutputFn(ptypes); + return new WritableType(Union.class, UnionWritable.class, input, output, ptypes); + } + public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) { WritableType<S, ?> wt = (WritableType<S, ?>) base; MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
