Updated Branches: refs/heads/master 2a8b6c149 -> c51ef57ae
Second cut at rewriting custom Writable types to a more compact format. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c51ef57a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c51ef57a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c51ef57a Branch: refs/heads/master Commit: c51ef57aeecb78901e7489728367c2921f4c08d7 Parents: 2a8b6c1 Author: Josh Wills <[email protected]> Authored: Wed Feb 27 21:34:44 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Nov 21 19:50:32 2013 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Sort.java | 7 +- .../lib/sort/TupleWritableComparator.java | 66 ++++++---- .../types/writable/GenericArrayWritable.java | 35 ++---- .../crunch/types/writable/TextMapWritable.java | 41 ++----- .../crunch/types/writable/TupleWritable.java | 65 +++++----- .../apache/crunch/types/writable/Writables.java | 119 ++++++++++++------- .../lib/TupleWritablePartitionerTest.java | 32 +---- .../writable/GenericArrayWritableTest.java | 25 ++-- .../crunch/types/writable/WritableTypeTest.java | 2 +- .../crunch/types/writable/WritablesTest.java | 37 ++++-- 10 files changed, 229 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java index 94ce7d8..011d9cd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java @@ -40,6 +40,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableType; import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.crunch.util.PartitionUtils; import org.apache.hadoop.conf.Configuration; @@ -252,7 +253,11 @@ public class Sort { if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) { builder.sortComparatorClass(ReverseWritableComparator.class); } else { - TupleWritableComparator.configureOrdering(conf, columnOrders); + WritableType[] wt = new WritableType[columnOrders.length]; + for (int i = 0; i < wt.length; i++) { + wt[i] = (WritableType) keyType.getSubTypes().get(i); + } + TupleWritableComparator.configureOrdering(conf, wt, columnOrders); builder.sortComparatorClass(TupleWritableComparator.class); } } else if (tf == AvroTypeFamily.getInstance()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java index 07ee5b5..9677fc1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java @@ -17,13 +17,21 @@ */ package org.apache.crunch.lib.sort; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.util.Arrays; +import java.util.List; +import com.google.common.collect.Lists; +import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.lib.Sort.ColumnOrder; import org.apache.crunch.lib.Sort.Order; import org.apache.crunch.types.writable.TupleWritable; +import org.apache.crunch.types.writable.WritableType; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; @@ -31,36 +39,29 @@ import org.apache.hadoop.io.WritableComparator; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; +import org.apache.hadoop.io.WritableFactories; public class TupleWritableComparator extends WritableComparator implements Configurable { private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering"; private Configuration conf; + Writable[] w1; + Writable[] w2; private ColumnOrder[] columnOrders; public TupleWritableComparator() { super(TupleWritable.class, true); } - public static void configureOrdering(Configuration conf, Order... orders) { - conf.set(CRUNCH_ORDERING_PROPERTY, - Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() { - @Override - public String apply(Order o) { - return o.name(); - } - }))); - } - - public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) { - conf.set(CRUNCH_ORDERING_PROPERTY, - Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() { - @Override - public String apply(ColumnOrder o) { - return o.column() + ";" + o.order().name(); - } - }))); + public static void configureOrdering(Configuration conf, WritableType[] types, ColumnOrder[] columnOrders) { + List<String> ordering = Lists.newArrayList(); + for (int i = 0; i < types.length; i++) { + Class<?> cls = types[i].getSerializationClass(); + String order = columnOrders[i].order().name(); + ordering.add(cls.getCanonicalName() + ";" + order); + } + conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(ordering)); } @Override @@ -83,16 +84,22 @@ public class TupleWritableComparator extends WritableComparator implements Confi } else if (!ta.has(index) && tb.has(index)) { return -order; } else { - Writable v1 = ta.get(index); - Writable v2 = tb.get(index); + BytesWritable v1 = ta.get(index); + BytesWritable v2 = tb.get(index); if (v1 != v2 && (v1 != null && !v1.equals(v2))) { - if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) { - int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2); + try { + w1[index].readFields(new DataInputStream(new ByteArrayInputStream(v1.getBytes()))); + w2[index].readFields(new DataInputStream(new ByteArrayInputStream(v2.getBytes()))); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + if (w1[index] instanceof WritableComparable && w2[index] instanceof WritableComparable) { + int cmp = ((WritableComparable) w1[index]).compareTo((WritableComparable) w2[index]); if (cmp != 0) { return order * cmp; } } else { - int cmp = v1.hashCode() - v2.hashCode(); + int cmp = w1[index].hashCode() - w2[index].hashCode(); if (cmp != 0) { return order * cmp; } @@ -115,11 +122,20 @@ public class TupleWritableComparator extends WritableComparator implements Confi String ordering = conf.get(CRUNCH_ORDERING_PROPERTY); String[] columnOrderNames = ordering.split(","); columnOrders = new ColumnOrder[columnOrderNames.length]; + w1 = new Writable[columnOrderNames.length]; + w2 = new Writable[columnOrderNames.length]; for (int i = 0; i < columnOrders.length; i++) { String[] split = columnOrderNames[i].split(";"); - int column = Integer.parseInt(split[0]); + String className = split[0]; + try { + Class cls = Class.forName(className); + w1[i] = WritableFactories.newInstance(cls); + w2[i] = WritableFactories.newInstance(cls); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } Order order = Order.valueOf(split[1]); - columnOrders[i] = ColumnOrder.by(column, order); + columnOrders[i] = ColumnOrder.by(i + 1, order); } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java index 8b54008..9731ff4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java @@ -23,20 +23,17 @@ import java.io.IOException; import java.util.Arrays; import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.crunch.CrunchRuntimeException; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableUtils; /** * A {@link Writable} for marshalling/unmarshalling Collections. Note that * element order is <em>undefined</em>! * - * @param <T> The value type */ -class GenericArrayWritable<T> implements Writable { - private Writable[] values; +class GenericArrayWritable implements Writable { + private BytesWritable[] values; private Class<? extends Writable> valueClass; public GenericArrayWritable(Class<? extends Writable> valueClass) { @@ -47,43 +44,29 @@ class GenericArrayWritable<T> implements Writable { // for deserialization } - public void set(Writable[] values) { + public void set(BytesWritable[] values) { this.values = values; } - public Writable[] get() { + public BytesWritable[] get() { return values; } public void readFields(DataInput in) throws IOException { - values = new Writable[WritableUtils.readVInt(in)]; // construct values + values = new BytesWritable[WritableUtils.readVInt(in)]; // construct values if (values.length > 0) { int nulls = WritableUtils.readVInt(in); if (nulls == values.length) { return; } - String valueType = Text.readString(in); - setValueType(valueType); for (int i = 0; i < values.length - nulls; i++) { - Writable value = WritableFactories.newInstance(valueClass); + BytesWritable value = new BytesWritable(); value.readFields(in); // read a value values[i] = value; // store it in values } } } - protected void setValueType(String valueType) { - if (valueClass == null) { - try { - valueClass = Class.forName(valueType).asSubclass(Writable.class); - } catch (ClassNotFoundException e) { - throw new CrunchRuntimeException(e); - } - } else if (!valueType.equals(valueClass.getName())) { - throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass); - } - } - public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, values.length); if (values.length > 0) { @@ -95,10 +78,6 @@ class GenericArrayWritable<T> implements Writable { } WritableUtils.writeVInt(out, nulls); if (values.length - nulls > 0) { - if (valueClass == null) { - throw new IllegalStateException("Value class not set by constructor or read"); - } - Text.writeString(out, valueClass.getName()); for (int i = 0; i < values.length; i++) { if (values[i] != null) { values[i].write(out); http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java index 1ab51df..d25bd82 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java @@ -23,63 +23,46 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import com.google.common.collect.Maps; -class TextMapWritable<T extends Writable> implements Writable { +class TextMapWritable implements Writable { - private Class<T> valueClazz; - private final Map<Text, T> instance; + private final Map<Text, BytesWritable> instance; public TextMapWritable() { this.instance = Maps.newHashMap(); } - public TextMapWritable(Class<T> valueClazz) { - this.valueClazz = valueClazz; - this.instance = Maps.newHashMap(); - } - - public void put(Text txt, T value) { + public void put(Text txt, BytesWritable value) { instance.put(txt, value); } - public Set<Map.Entry<Text, T>> entrySet() { + public Set<Map.Entry<Text, BytesWritable>> entrySet() { return instance.entrySet(); } @Override public void readFields(DataInput in) throws IOException { instance.clear(); - try { - this.valueClazz = (Class<T>) Class.forName(Text.readString(in)); - } catch (ClassNotFoundException e) { - throw (IOException) new IOException("Failed map init").initCause(e); - } int entries = WritableUtils.readVInt(in); - try { - for (int i = 0; i < entries; i++) { - Text txt = new Text(); - txt.readFields(in); - T value = valueClazz.newInstance(); - value.readFields(in); - instance.put(txt, value); - } - } catch (IllegalAccessException e) { - throw (IOException) new IOException("Failed map init").initCause(e); - } catch (InstantiationException e) { - throw (IOException) new IOException("Failed map init").initCause(e); + for (int i = 0; i < entries; i++) { + Text txt = new Text(); + txt.readFields(in); + BytesWritable value = new BytesWritable(); + value.readFields(in); + instance.put(txt, value); } } @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, valueClazz.getName()); WritableUtils.writeVInt(out, instance.size()); - for (Map.Entry<Text, T> e : instance.entrySet()) { + for (Map.Entry<Text, BytesWritable> e : instance.entrySet()) { e.getKey().write(out); e.getValue().write(out); } http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java index 1c3536b..251e4f5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java @@ -17,14 +17,19 @@ */ package org.apache.crunch.types.writable; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.io.Text; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableUtils; /** @@ -35,8 +40,9 @@ import org.apache.hadoop.io.WritableUtils; public class TupleWritable implements WritableComparable<TupleWritable> { private long written; - private Writable[] values; - + private BytesWritable[] values; + private List<Class<Writable>> writableClasses; + /** * Create an empty tuple with no allocated storage for writables. */ @@ -47,11 +53,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> { * Initialize tuple with storage; unknown whether any of them contain * "written" values. */ - public TupleWritable(Writable[] vals) { + public TupleWritable(BytesWritable[] vals) { written = 0L; values = vals; } + public void setWritableClasses(List<Class<Writable>> writableClasses) { + this.writableClasses = writableClasses; + } + /** * Return true if tuple has an element at the position provided. */ @@ -62,7 +72,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> { /** * Get ith Writable from Tuple. */ - public Writable get(int i) { + public BytesWritable get(int i) { return values[i]; } @@ -110,7 +120,19 @@ public class TupleWritable implements WritableComparable<TupleWritable> { public String toString() { StringBuffer buf = new StringBuffer("["); for (int i = 0; i < values.length; ++i) { - buf.append(has(i) ? values[i].toString() : ""); + if (has(i)) { + if (writableClasses != null) { + Writable w = WritableFactories.newInstance(writableClasses.get(i)); + try { + w.readFields(new DataInputStream(new ByteArrayInputStream(values[i].getBytes()))); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + buf.append(w.toString()); + } else { + buf.append(values[i].toString()); + } + } buf.append(","); } if (values.length != 0) @@ -131,11 +153,6 @@ public class TupleWritable implements WritableComparable<TupleWritable> { WritableUtils.writeVLong(out, written); for (int i = 0; i < values.length; ++i) { if (has(i)) { - Text.writeString(out, values[i].getClass().getName()); - } - } - for (int i = 0; i < values.length; ++i) { - if (has(i)) { values[i].write(out); } } @@ -144,31 +161,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> { /** * {@inheritDoc} */ - @SuppressWarnings("unchecked") - // No static typeinfo on Tuples public void readFields(DataInput in) throws IOException { int card = WritableUtils.readVInt(in); - values = new Writable[card]; + values = new BytesWritable[card]; written = WritableUtils.readVLong(in); - Class<? extends Writable>[] cls = new Class[card]; - try { - for (int i = 0; i < card; ++i) { - if (has(i)) { - cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class); - } - } - for (int i = 0; i < card; ++i) { - if (has(i)) { - values[i] = cls[i].newInstance(); - values[i].readFields(in); - } + for (int i = 0; i < card; ++i) { + if (has(i)) { + values[i] = new BytesWritable(); + values[i].readFields(in); } - } catch (ClassNotFoundException e) { - throw (IOException) new IOException("Failed tuple init").initCause(e); - } catch (IllegalAccessException e) { - throw (IOException) new IOException("Failed tuple init").initCause(e); - } catch (InstantiationException e) { - throw (IOException) new IOException("Failed tuple init").initCause(e); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java index 78cf3ae..0273e5e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -17,11 +17,15 @@ */ package org.apache.crunch.types.writable; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; import org.apache.crunch.Tuple; @@ -44,6 +48,8 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.collect.ImmutableMap; @@ -273,6 +279,16 @@ public class Writables { return new WritableTableType((WritableType) key, (WritableType) value); } + private static <W extends Writable> W create(Class<W> clazz, BytesWritable bytes) { + W instance = (W) WritableFactories.newInstance(clazz); + try { + instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes()))); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + return instance; + } + /** * For mapping from {@link TupleWritable} instances to {@link Tuple}s. * @@ -280,14 +296,17 @@ public class Writables { private static class TWTupleMapFn extends MapFn<TupleWritable, Tuple> { private final TupleFactory<?> tupleFactory; private final List<MapFn> fns; - + private final List<Class<Writable>> writableClasses; + private transient Object[] values; - public TWTupleMapFn(TupleFactory<?> tupleFactory, PType<?>... ptypes) { + public TWTupleMapFn(TupleFactory<?> tupleFactory, WritableType<?, ?>... ptypes) { this.tupleFactory = tupleFactory; this.fns = Lists.newArrayList(); - for (PType ptype : ptypes) { + this.writableClasses = Lists.newArrayList(); + for (WritableType ptype : ptypes) { fns.add(ptype.getInputMapFn()); + writableClasses.add(ptype.getSerializationClass()); } } @@ -321,7 +340,8 @@ public class Writables { public Tuple map(TupleWritable in) { for (int i = 0; i < values.length; i++) { if (in.has(i)) { - values[i] = fns.get(i).map(in.get(i)); + Writable w = create(writableClasses.get(i), in.get(i)); + values[i] = fns.get(i).map(w); } else { values[i] = null; } @@ -337,14 +357,17 @@ public class Writables { private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> { private transient TupleWritable writable; - private transient Writable[] values; + private transient BytesWritable[] values; private final List<MapFn> fns; - + private final List<Class<Writable>> writableClasses; + public TupleTWMapFn(PType<?>... ptypes) { this.fns = Lists.newArrayList(); + this.writableClasses = Lists.newArrayList(); for (PType<?> ptype : ptypes) { fns.add(ptype.getOutputMapFn()); + writableClasses.add(((WritableType) ptype).getSerializationClass()); } } @@ -364,8 +387,9 @@ public class Writables { @Override public void initialize() { - this.values = new Writable[fns.size()]; + this.values = new BytesWritable[fns.size()]; this.writable = new TupleWritable(values); + this.writable.setWritableClasses(writableClasses); for (MapFn fn : fns) { fn.initialize(); } @@ -378,7 +402,8 @@ public class Writables { Object value = input.get(i); if (value != null) { writable.setWritten(i); - values[i] = (Writable) fns.get(i).map(value); + Writable w = (Writable) fns.get(i).map(value); + values[i] = new BytesWritable(WritableUtils.toByteArray(w)); } } return writable; @@ -386,38 +411,46 @@ public class Writables { } public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) { - TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2); + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, (WritableType) p1, (WritableType) p2); TupleTWMapFn output = new TupleTWMapFn(p1, p2); return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2); } public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) { - TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3); + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, (WritableType) p1, + (WritableType) p2, (WritableType) p3); TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3); return new WritableType(Tuple3.class, TupleWritable.class, input, output, p1, p2, p3); } public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) { - TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4); + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, (WritableType) p1, + (WritableType) p2, (WritableType) p3, (WritableType) p4); TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4); return new WritableType(Tuple4.class, TupleWritable.class, input, output, p1, p2, p3, p4); } public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) { - TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes); + WritableType[] wt = new WritableType[ptypes.length]; + for (int i = 0; i < wt.length; i++) { + wt[i] = (WritableType) ptypes[i]; + } + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, wt); TupleTWMapFn output = new TupleTWMapFn(ptypes); return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes); } public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType... ptypes) { Class[] typeArgs = new Class[ptypes.length]; + WritableType[] wt = new WritableType[ptypes.length]; for (int i = 0; i < typeArgs.length; i++) { typeArgs[i] = ptypes[i].getTypeClass(); + wt[i] = (WritableType) ptypes[i]; } TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs); - TWTupleMapFn input = new TWTupleMapFn(factory, ptypes); + TWTupleMapFn input = new TWTupleMapFn(factory, wt); TupleTWMapFn output = new TupleTWMapFn(ptypes); return new WritableType(clazz, TupleWritable.class, input, output, ptypes); } @@ -430,9 +463,11 @@ public class Writables { } private static class ArrayCollectionMapFn<T> extends MapFn<GenericArrayWritable, Collection<T>> { + private Class<Writable> clazz; private final MapFn<Object, T> mapFn; - - public ArrayCollectionMapFn(MapFn<Object, T> mapFn) { + + public ArrayCollectionMapFn(Class<Writable> clazz, MapFn<Object, T> mapFn) { + this.clazz = clazz; this.mapFn = mapFn; } @@ -454,8 +489,9 @@ public class Writables { @Override public Collection<T> map(GenericArrayWritable input) { Collection<T> collection = Lists.newArrayList(); - for (Writable writable : input.get()) { - collection.add(mapFn.map(writable)); + for (BytesWritable raw : input.get()) { + Writable w = create(clazz, raw); + collection.add(mapFn.map(w)); } return collection; } @@ -463,11 +499,9 @@ public class Writables { private static class CollectionArrayMapFn<T> extends MapFn<Collection<T>, GenericArrayWritable> { - private final Class<? extends Writable> clazz; private final MapFn<T, Object> mapFn; - public CollectionArrayMapFn(Class<? extends Writable> clazz, MapFn<T, Object> mapFn) { - this.clazz = clazz; + public CollectionArrayMapFn(MapFn<T, Object> mapFn) { this.mapFn = mapFn; } @@ -488,27 +522,31 @@ public class Writables { @Override public GenericArrayWritable map(Collection<T> input) { - GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz); - Writable[] w = new Writable[input.size()]; + GenericArrayWritable arrayWritable = new GenericArrayWritable(); + BytesWritable[] w = new BytesWritable[input.size()]; int index = 0; for (T in : input) { - w[index++] = ((Writable) mapFn.map(in)); + Writable v = (Writable) mapFn.map(in); + w[index++] = new BytesWritable(WritableUtils.toByteArray(v)); } arrayWritable.set(w); return arrayWritable; } } - public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) { + public static <T> WritableType<Collection<T>, GenericArrayWritable> collections(PType<T> ptype) { WritableType<T, ?> wt = (WritableType<T, ?>) ptype; - return new WritableType(Collection.class, GenericArrayWritable.class, new ArrayCollectionMapFn(wt.getInputMapFn()), - new CollectionArrayMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype); + return new WritableType(Collection.class, GenericArrayWritable.class, + new ArrayCollectionMapFn(wt.getSerializationClass(), wt.getInputMapFn()), + new CollectionArrayMapFn(wt.getOutputMapFn()), ptype); } - private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> { + private static class MapInputMapFn<T> extends MapFn<TextMapWritable, Map<String, T>> { + private final Class<Writable> clazz; private final MapFn<Writable, T> mapFn; - public MapInputMapFn(MapFn<Writable, T> mapFn) { + public MapInputMapFn(Class<Writable> clazz, MapFn<Writable, T> mapFn) { + this.clazz = clazz; this.mapFn = mapFn; } @@ -528,22 +566,21 @@ public class Writables { } @Override - public Map<String, T> map(TextMapWritable<Writable> input) { + public Map<String, T> map(TextMapWritable input) { Map<String, T> out = Maps.newHashMap(); - for (Map.Entry<Text, Writable> e : input.entrySet()) { - out.put(e.getKey().toString(), mapFn.map(e.getValue())); + for (Map.Entry<Text, BytesWritable> e : input.entrySet()) { + Writable v = create(clazz, e.getValue()); + out.put(e.getKey().toString(), mapFn.map(v)); } return out; } } - private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> { + private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable> { - private final Class<Writable> clazz; private final MapFn<T, Writable> mapFn; - public MapOutputMapFn(Class<Writable> clazz, MapFn<T, Writable> mapFn) { - this.clazz = clazz; + public MapOutputMapFn(MapFn<T, Writable> mapFn) { this.mapFn = mapFn; } @@ -563,10 +600,11 @@ public class Writables { } @Override - public TextMapWritable<Writable> map(Map<String, T> input) { - TextMapWritable<Writable> tmw = new TextMapWritable<Writable>(clazz); + public TextMapWritable map(Map<String, T> input) { + TextMapWritable tmw = new TextMapWritable(); for (Map.Entry<String, T> e : input.entrySet()) { - tmw.put(new Text(e.getKey()), mapFn.map(e.getValue())); + Writable w = mapFn.map(e.getValue()); + tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w))); } return tmw; } @@ -574,8 +612,9 @@ public class Writables { public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) { WritableType<T, ?> wt = (WritableType<T, ?>) ptype; - return new WritableType(Map.class, TextMapWritable.class, new MapInputMapFn(wt.getInputMapFn()), - new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype); + return new WritableType(Map.class, TextMapWritable.class, + new MapInputMapFn(wt.getSerializationClass(), wt.getInputMapFn()), + new MapOutputMapFn(wt.getOutputMapFn()), ptype); } public static <T> PType<T> jsons(Class<T> clazz) { http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java index 35ccc11..e8727c3 100644 --- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java @@ -21,9 +21,10 @@ import static org.junit.Assert.assertEquals; import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner; import org.apache.crunch.types.writable.TupleWritable; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; @@ -39,30 +40,9 @@ public class TupleWritablePartitionerTest { @Test public void testGetPartition() { IntWritable intWritable = new IntWritable(3); - TupleWritable key = new TupleWritable(new Writable[] { intWritable }); - assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5)); - assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2)); + BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable)); + TupleWritable key = new TupleWritable(new BytesWritable[] { bw }); + assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5)); + assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2)); } - - @Test - public void testGetPartition_NegativeHashValue() { - IntWritable intWritable = new IntWritable(-3); - // Sanity check, if this doesn't work then the premise of this test is wrong - assertEquals(-3, intWritable.hashCode()); - - TupleWritable key = new TupleWritable(new Writable[] { intWritable }); - assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5)); - assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2)); - } - - @Test - public void testGetPartition_IntegerMinValue() { - IntWritable intWritable = new IntWritable(Integer.MIN_VALUE); - // Sanity check, if this doesn't work then the premise of this test is wrong - assertEquals(Integer.MIN_VALUE, intWritable.hashCode()); - - TupleWritable key = new TupleWritable(new Writable[] { intWritable }); - assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE)); - } - } http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java index c807a90..c446a69 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertThat; import java.util.Arrays; import org.apache.crunch.test.Tests; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.junit.Test; @@ -35,36 +36,38 @@ public class GenericArrayWritableTest { @Test public void testEmpty() { - GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class); - src.set(new Text[0]); + GenericArrayWritable src = new GenericArrayWritable(); + src.set(new BytesWritable[0]); - GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>()); + GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable()); assertThat(dest.get().length, is(0)); } @Test public void testNonEmpty() { - GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class); - src.set(new Text[] { new Text("foo"), new Text("bar") }); + GenericArrayWritable src = new GenericArrayWritable(); + src.set(new BytesWritable[] { new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()) }); - GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>()); + GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable()); assertThat(src.get(), not(sameInstance(dest.get()))); assertThat(dest.get().length, is(2)); - assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar"))); + assertThat(Arrays.asList(dest.get()), + hasItems(new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()))); } @Test public void testNulls() { - GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class); - src.set(new Text[] { new Text("a"), null, new Text("b") }); + GenericArrayWritable src = new GenericArrayWritable(); + src.set(new BytesWritable[] { new BytesWritable("a".getBytes()), null, new BytesWritable("b".getBytes()) }); - GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>()); + GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable()); assertThat(src.get(), not(sameInstance(dest.get()))); assertThat(dest.get().length, is(3)); - assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null)); + assertThat(Arrays.asList(dest.get()), + hasItems(new BytesWritable("a".getBytes()), new BytesWritable("b".getBytes()), null)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java index 65e946b..19a9bfe 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java @@ -58,7 +58,7 @@ public class WritableTypeTest { @Test public void testGetDetachedValue_Collection() { Collection<Text> textCollection = Lists.newArrayList(new Text("value")); - WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables + WritableType<Collection<Text>, GenericArrayWritable> ptype = Writables .collections(Writables.writables(Text.class)); ptype.initialize(new Configuration()); http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java index 5396fba..b1f4107 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java @@ -43,6 +43,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.junit.Test; import com.google.common.collect.Lists; @@ -111,15 +112,22 @@ public class WritablesTest { String s = "abc"; Collection<String> j = Lists.newArrayList(); j.add(s); - GenericArrayWritable<Text> w = new GenericArrayWritable<Text>(Text.class); - w.set(new Text[] { new Text(s) }); + GenericArrayWritable w = new GenericArrayWritable(); + Text t = new Text(s); + BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(t)); + w.set(new BytesWritable[] { bw }); testInputOutputFn(Writables.collections(Writables.strings()), j, w); } @Test public void testPairs() throws Exception { Pair<String, String> j = Pair.of("a", "b"); - TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), }); + Text[] t = new Text[] { new Text("a"), new Text("b"), }; + BytesWritable[] b = new BytesWritable[t.length]; + for (int i = 0; i < t.length; i++) { + b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); + } + TupleWritable w = new TupleWritable(b); w.setWritten(0); w.setWritten(1); testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w); @@ -144,7 +152,12 @@ public class WritablesTest { @SuppressWarnings("rawtypes") public void testTriples() throws Exception { Tuple3 j = Tuple3.of("a", "b", "c"); - TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), }); + Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), }; + BytesWritable[] b = new BytesWritable[t.length]; + for (int i = 0; i < t.length; i++) { + b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); + } + TupleWritable w = new TupleWritable(b); w.setWritten(0); w.setWritten(1); w.setWritten(2); @@ -156,7 +169,12 @@ public class WritablesTest { @SuppressWarnings("rawtypes") public void testQuads() throws Exception { Tuple4 j = Tuple4.of("a", "b", "c", "d"); - TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), }); + Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), }; + BytesWritable[] b = new BytesWritable[t.length]; + for (int i = 0; i < t.length; i++) { + b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); + } + TupleWritable w = new TupleWritable(b); w.setWritten(0); w.setWritten(1); w.setWritten(2); @@ -169,8 +187,13 @@ public class WritablesTest { @Test public void testTupleN() throws Exception { TupleN j = new TupleN("a", "b", "c", "d", "e"); - TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), - new Text("e"), }); + Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), + new Text("e"), }; + BytesWritable[] b = new BytesWritable[t.length]; + for (int i = 0; i < t.length; i++) { + b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); + } + TupleWritable w = new TupleWritable(b); w.setWritten(0); w.setWritten(1); w.setWritten(2);
