Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 1a34cd097 -> d053d2624
CRUNCH-329: Fix secondary sorts for writables by re-introducing type info. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d053d262 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d053d262 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d053d262 Branch: refs/heads/apache-crunch-0.8 Commit: d053d262418f7332e8aba809e7cb3b3381d07bb2 Parents: 1a34cd0 Author: Josh Wills <[email protected]> Authored: Wed Jan 22 19:54:45 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Feb 14 15:04:29 2014 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/lib/SecondarySortIT.java | 19 ++- .../lib/sort/TupleWritableComparator.java | 44 +----- .../crunch/types/writable/TupleWritable.java | 127 ++++++++------- .../apache/crunch/types/writable/Writables.java | 158 ++++++++++++++++--- .../lib/TupleWritablePartitionerTest.java | 3 +- .../crunch/types/writable/WritablesTest.java | 39 +---- 6 files changed, 236 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d053d262/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java index 242f621..7284ab1 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java @@ -17,7 +17,6 @@ */ package org.apache.crunch.lib; -import static org.apache.crunch.types.avro.Avros.*; import static org.junit.Assert.assertEquals; import java.io.Serializable; @@ -29,6 +28,9 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Test; import com.google.common.base.Joiner; @@ -38,7 +40,16 @@ import com.google.common.collect.ImmutableList; public class SecondarySortIT extends CrunchTestSupport implements Serializable { @Test - public void testSecondarySort() throws Exception { + public void testSecondarySortAvros() throws Exception { + runSecondarySort(AvroTypeFamily.getInstance()); + } + + @Test + public void testSecondarySortWritables() throws Exception { + runSecondarySort(WritableTypeFamily.getInstance()); + } + + public void runSecondarySort(PTypeFamily ptf) throws Exception { Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration()); String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt"); @@ -50,14 +61,14 @@ public class SecondarySortIT extends CrunchTestSupport implements Serializable { return Pair.of(pieces[0], Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim()))); } - }, tableOf(strings(), pairs(ints(), ints()))); + }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.ints(), ptf.ints()))); Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() { @Override public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) { Joiner j = Joiner.on(','); return j.join(input.first(), j.join(input.second())); } - }, strings()).materialize(); + }, ptf.strings()).materialize(); assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"), ImmutableList.copyOf(lines)); p.done(); http://git-wip-us.apache.org/repos/asf/crunch/blob/d053d262/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java index 9677fc1..9d16821 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java @@ -17,37 +17,26 @@ */ package org.apache.crunch.lib.sort; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.util.Arrays; import java.util.List; import com.google.common.collect.Lists; -import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.lib.Sort.ColumnOrder; import org.apache.crunch.lib.Sort.Order; import org.apache.crunch.types.writable.TupleWritable; import org.apache.crunch.types.writable.WritableType; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; -import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; -import org.apache.hadoop.io.WritableFactories; public class TupleWritableComparator extends WritableComparator implements Configurable { private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering"; private Configuration conf; - Writable[] w1; - Writable[] w2; private ColumnOrder[] columnOrders; public TupleWritableComparator() { @@ -57,9 +46,7 @@ public class TupleWritableComparator extends WritableComparator implements Confi public static void configureOrdering(Configuration conf, WritableType[] types, ColumnOrder[] columnOrders) { List<String> ordering = Lists.newArrayList(); for (int i = 0; i < types.length; i++) { - Class<?> cls = types[i].getSerializationClass(); - String order = columnOrders[i].order().name(); - ordering.add(cls.getCanonicalName() + ";" + order); + ordering.add(columnOrders[i].order().name()); } conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(ordering)); } @@ -84,22 +71,16 @@ public class TupleWritableComparator extends WritableComparator implements Confi } else if (!ta.has(index) && tb.has(index)) { return -order; } else { - BytesWritable v1 = ta.get(index); - BytesWritable v2 = tb.get(index); + Writable v1 = ta.get(index); + Writable v2 = tb.get(index); if (v1 != v2 && (v1 != null && !v1.equals(v2))) { - try { - w1[index].readFields(new DataInputStream(new ByteArrayInputStream(v1.getBytes()))); - w2[index].readFields(new DataInputStream(new ByteArrayInputStream(v2.getBytes()))); - } catch (IOException e) { - throw new CrunchRuntimeException(e); - } - if (w1[index] instanceof WritableComparable && w2[index] instanceof WritableComparable) { - int cmp = ((WritableComparable) w1[index]).compareTo((WritableComparable) w2[index]); + if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) { + int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2); if (cmp != 0) { return order * cmp; } } else { - int cmp = w1[index].hashCode() - w2[index].hashCode(); + int cmp = v1.hashCode() - v2.hashCode(); if (cmp != 0) { return order * cmp; } @@ -122,19 +103,8 @@ public class TupleWritableComparator extends WritableComparator implements Confi String ordering = conf.get(CRUNCH_ORDERING_PROPERTY); String[] columnOrderNames = ordering.split(","); columnOrders = new ColumnOrder[columnOrderNames.length]; - w1 = new Writable[columnOrderNames.length]; - w2 = new Writable[columnOrderNames.length]; for (int i = 0; i < columnOrders.length; i++) { - String[] split = columnOrderNames[i].split(";"); - String className = split[0]; - try { - Class cls = Class.forName(className); - w1[i] = WritableFactories.newInstance(cls); - w2[i] = WritableFactories.newInstance(cls); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - Order order = Order.valueOf(split[1]); + Order order = Order.valueOf(columnOrderNames[i]); columnOrders[i] = ColumnOrder.by(i + 1, order); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d053d262/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java index 251e4f5..1362132 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java @@ -17,16 +17,16 @@ */ package org.apache.crunch.types.writable; -import java.io.ByteArrayInputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import java.util.List; +import java.util.Arrays; +import com.google.common.base.Preconditions; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.CrunchRuntimeException; -import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableFactories; @@ -37,42 +37,64 @@ import org.apache.hadoop.io.WritableUtils; * added here because of its package visibility restrictions. * */ -public class TupleWritable implements WritableComparable<TupleWritable> { +public class TupleWritable extends Configured implements WritableComparable<TupleWritable> { + + private int[] written; + private Writable[] values; - private long written; - private BytesWritable[] values; - private List<Class<Writable>> writableClasses; - /** * Create an empty tuple with no allocated storage for writables. */ public TupleWritable() { } + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) return; + + try { + Writables.reloadWritableComparableCodes(conf); + } catch (Exception e) { + throw new CrunchRuntimeException("Error reloading writable comparable codes", e); + } + } + + private static int[] getCodes(Writable[] writables) { + int[] b = new int[writables.length]; + for (int i = 0; i < b.length; i++) { + if (writables[i] != null) { + b[i] = getCode(writables[i].getClass()); + } + } + return b; + } + + public TupleWritable(Writable[] values) { + this(values, getCodes(values)); + } + /** * Initialize tuple with storage; unknown whether any of them contain * "written" values. */ - public TupleWritable(BytesWritable[] vals) { - written = 0L; - values = vals; + public TupleWritable(Writable[] values, int[] written) { + Preconditions.checkArgument(values.length == written.length); + this.written = written; + this.values = values; } - public void setWritableClasses(List<Class<Writable>> writableClasses) { - this.writableClasses = writableClasses; - } - /** * Return true if tuple has an element at the position provided. */ public boolean has(int i) { - return 0 != ((1 << i) & written); + return written[i] != 0; } /** * Get ith Writable from Tuple. */ - public BytesWritable get(int i) { + public Writable get(int i) { return values[i]; } @@ -89,13 +111,13 @@ public class TupleWritable implements WritableComparable<TupleWritable> { public boolean equals(Object other) { if (other instanceof TupleWritable) { TupleWritable that = (TupleWritable) other; - if (this.size() != that.size() || this.written != that.written) { + if (this.size() != that.size()) { return false; } for (int i = 0; i < values.length; ++i) { if (!has(i)) continue; - if (!values[i].equals(that.get(i))) { + if (written[i] != that.written[i] || !values[i].equals(that.values[i])) { return false; } } @@ -121,17 +143,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> { StringBuffer buf = new StringBuffer("["); for (int i = 0; i < values.length; ++i) { if (has(i)) { - if (writableClasses != null) { - Writable w = WritableFactories.newInstance(writableClasses.get(i)); - try { - w.readFields(new DataInputStream(new ByteArrayInputStream(values[i].getBytes()))); - } catch (IOException e) { - throw new CrunchRuntimeException(e); - } - buf.append(w.toString()); - } else { - buf.append(values[i].toString()); - } + buf.append(values[i].toString()); } buf.append(","); } @@ -142,6 +154,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> { return buf.toString(); } + public void clear() { + Arrays.fill(written, (byte) 0); + } + + public void set(int index, Writable w) { + written[index] = getCode(w.getClass()); + values[index] = w; + } + /** * Writes each Writable to <code>out</code>. TupleWritable format: * {@code @@ -150,9 +171,9 @@ public class TupleWritable implements WritableComparable<TupleWritable> { */ public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, values.length); - WritableUtils.writeVLong(out, written); for (int i = 0; i < values.length; ++i) { - if (has(i)) { + WritableUtils.writeVInt(out, written[i]); + if (written[i] != 0) { values[i].write(out); } } @@ -163,36 +184,32 @@ public class TupleWritable implements WritableComparable<TupleWritable> { */ public void readFields(DataInput in) throws IOException { int card = WritableUtils.readVInt(in); - values = new BytesWritable[card]; - written = WritableUtils.readVLong(in); + values = new Writable[card]; + written = new int[card]; for (int i = 0; i < card; ++i) { - if (has(i)) { - values[i] = new BytesWritable(); + written[i] = WritableUtils.readVInt(in); + if (written[i] != 0) { + values[i] = getWritable(written[i], getConf()); values[i].readFields(in); } } } - /** - * Record that the tuple contains an element at the position provided. - */ - public void setWritten(int i) { - written |= 1 << i; - } - - /** - * Record that the tuple does not contain an element at the position provided. - */ - public void clearWritten(int i) { - written &= -1 ^ (1 << i); + static int getCode(Class<? extends Writable> clazz) { + if (Writables.WRITABLE_CODES.inverse().containsKey(clazz)) { + return Writables.WRITABLE_CODES.inverse().get(clazz); + } else { + return 1; // default for BytesWritable + } } - /** - * Clear any record of which writables have been written to, without releasing - * storage. - */ - public void clearWritten() { - written = 0L; + static Writable getWritable(int code, Configuration conf) { + Class<? extends Writable> clazz = Writables.WRITABLE_CODES.get(code); + if (clazz != null) { + return WritableFactories.newInstance(clazz, conf); + } else { + throw new IllegalStateException("Unknown Writable code: " + code); + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/d053d262/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java index d8ad6ca..a121ae3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -18,13 +18,23 @@ package org.apache.crunch.types.writable; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableBiMap; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; @@ -49,6 +59,7 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.TaskInputOutputContext; @@ -63,6 +74,78 @@ import com.google.common.collect.Maps; * */ public class Writables { + + private static final Log LOG = LogFactory.getLog(Writables.class); + + static BiMap<Integer, Class<? extends Writable>> WRITABLE_CODES = HashBiMap.create(ImmutableBiMap.<Integer, Class<? extends Writable>>builder() + .put(1, BytesWritable.class) + .put(2, Text.class) + .put(3, IntWritable.class) + .put(4, LongWritable.class) + .put(5, FloatWritable.class) + .put(6, DoubleWritable.class) + .put(7, BooleanWritable.class) + .put(8, TupleWritable.class) + .put(9, TextMapWritable.class) + .put(10, UnionWritable.class) + .build()); + + /** + * Registers a {@code WritableComparable} class so that it can be used for comparing the fields inside of + * tuple types (e.g., {@code pairs}, {@code trips}, {@code tupleN}, etc.) for use in sorts and + * secondary sorts. + * + * @param clazz The WritableComparable class to register + * @return the integer code that was assigned to serialized instances of this class + */ + public static void registerComparable(Class<? extends WritableComparable> clazz) { + int code = clazz.hashCode(); + if (code < 0) { + code = -code; + } + if (code < WRITABLE_CODES.size()) { + code += WRITABLE_CODES.size(); + } + registerComparable(clazz, code); + } + + /** + * Registers a {@code WritableComparable} class with a given integer code to use for serializing + * and deserializing instances of this class that are defined inside of tuple types (e.g., {@code pairs}, + * {@code trips}, {@code tupleN}, etc.) Unregistered Writables are always serialized to bytes and + * cannot be used in comparisons (e.g., sorts and secondary sorts) according to their underlying types. + * + * @param clazz The class to register + * @param code The unique registration code for the class, which must be greater than or equal to 8 + */ + public static void registerComparable(Class<? extends WritableComparable> clazz, int code) { + if (WRITABLE_CODES.containsKey(code)) { + throw new IllegalArgumentException("Already have writable class assigned to code = " + code); + } + } + + private static final String WRITABLE_COMPARABLE_CODES = "crunch.writable.comparable.codes"; + + private static void serializeWritableComparableCodes(Configuration conf) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(WRITABLE_CODES); + oos.close(); + conf.set(WRITABLE_COMPARABLE_CODES, Base64.encodeBase64String(baos.toByteArray())); + } + + static void reloadWritableComparableCodes(Configuration conf) throws Exception { + if (conf.get(WRITABLE_COMPARABLE_CODES) != null) { + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(conf.get(WRITABLE_COMPARABLE_CODES))); + ObjectInputStream ois = new ObjectInputStream(bais); + BiMap<Integer, Class<? extends Writable>> codes = (BiMap<Integer, Class<? extends Writable>>) ois.readObject(); + ois.close(); + for (Map.Entry<Integer, Class<? extends Writable>> e : codes.entrySet()) { + WRITABLE_CODES.put(e.getKey(), e.getValue()); + } + } + } + private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>() { @Override public Void map(NullWritable input) { @@ -280,14 +363,19 @@ public class Writables { return new WritableTableType((WritableType) key, (WritableType) value); } - private static <W extends Writable> W create(Class<W> clazz, BytesWritable bytes) { - W instance = (W) WritableFactories.newInstance(clazz); - try { - instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes()))); - } catch (IOException e) { - throw new CrunchRuntimeException(e); + private static <W extends Writable> W create(Class<W> clazz, Writable writable) { + if (clazz.equals(writable.getClass())) { + return (W) writable; + } else { + W instance = (W) WritableFactories.newInstance(clazz); + BytesWritable bytes = (BytesWritable) writable; + try { + instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes()))); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + return instance; } - return instance; } /** @@ -307,12 +395,26 @@ public class Writables { this.writableClasses = Lists.newArrayList(); for (WritableType ptype : ptypes) { fns.add(ptype.getInputMapFn()); - writableClasses.add(ptype.getSerializationClass()); + Class<Writable> clazz = ptype.getSerializationClass(); + if (WritableComparable.class.isAssignableFrom(clazz)) { + if (!WRITABLE_CODES.inverse().containsKey(clazz)) { + LOG.warn(String.format( + "WritableComparable class %s in tuple type should be registered with Writables.registerComparable", + clazz.toString())); + } + } + writableClasses.add(clazz); } } @Override public void configure(Configuration conf) { + try { + serializeWritableComparableCodes(conf); + } catch (IOException e) { + throw new CrunchRuntimeException("Error serializing writable comparable codes", e); + } + for (MapFn fn : fns) { fn.configure(conf); } @@ -327,9 +429,11 @@ public class Writables { @Override public void initialize() { + for (MapFn fn : fns) { fn.initialize(); } + // The rest of the methods allocate new // objects each time. However this one // uses Tuple.tuplify which does a copy @@ -357,23 +461,28 @@ public class Writables { */ private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> { - private transient TupleWritable writable; - private transient BytesWritable[] values; - private final List<MapFn> fns; - private final List<Class<Writable>> writableClasses; - + + private transient int[] written; + private transient Writable[] values; + public TupleTWMapFn(PType<?>... ptypes) { this.fns = Lists.newArrayList(); - this.writableClasses = Lists.newArrayList(); for (PType<?> ptype : ptypes) { fns.add(ptype.getOutputMapFn()); - writableClasses.add(((WritableType) ptype).getSerializationClass()); } + + this.written = new int[fns.size()]; + this.values = new Writable[fns.size()]; } @Override public void configure(Configuration conf) { + try { + serializeWritableComparableCodes(conf); + } catch (IOException e) { + throw new CrunchRuntimeException("Error serializing writable comparable codes", e); + } for (MapFn fn : fns) { fn.configure(conf); } @@ -388,26 +497,31 @@ public class Writables { @Override public void initialize() { - this.values = new BytesWritable[fns.size()]; - this.writable = new TupleWritable(values); - this.writable.setWritableClasses(writableClasses); for (MapFn fn : fns) { fn.initialize(); } + this.written = new int[fns.size()]; + this.values = new Writable[fns.size()]; } @Override public TupleWritable map(Tuple input) { - writable.clearWritten(); + Arrays.fill(written, (byte) 0); + Arrays.fill(values, null); for (int i = 0; i < input.size(); i++) { Object value = input.get(i); if (value != null) { - writable.setWritten(i); Writable w = (Writable) fns.get(i).map(value); - values[i] = new BytesWritable(WritableUtils.toByteArray(w)); + if (WRITABLE_CODES.inverse().containsKey(w.getClass())) { + values[i] = w; + written[i] = WRITABLE_CODES.inverse().get(w.getClass()); + } else { + values[i] = new BytesWritable(WritableUtils.toByteArray(w)); + written[i] = 1; // code for BytesWritable + } } } - return writable; + return new TupleWritable(values, written); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d053d262/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java index e8727c3..aee185a 100644 --- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java @@ -24,6 +24,7 @@ import org.apache.crunch.types.writable.TupleWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; @@ -41,7 +42,7 @@ public class TupleWritablePartitionerTest { public void testGetPartition() { IntWritable intWritable = new IntWritable(3); BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable)); - TupleWritable key = new TupleWritable(new BytesWritable[] { bw }); + TupleWritable key = new TupleWritable(new Writable[] { bw }); assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5)); assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d053d262/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java index b1f4107..3a6fc18 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java @@ -19,7 +19,6 @@ package org.apache.crunch.types.writable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import java.io.DataInput; @@ -123,13 +122,7 @@ public class WritablesTest { public void testPairs() throws Exception { Pair<String, String> j = Pair.of("a", "b"); Text[] t = new Text[] { new Text("a"), new Text("b"), }; - BytesWritable[] b = new BytesWritable[t.length]; - for (int i = 0; i < t.length; i++) { - b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); - } - TupleWritable w = new TupleWritable(b); - w.setWritten(0); - w.setWritten(1); + TupleWritable w = new TupleWritable(t); testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w); } @@ -153,14 +146,7 @@ public class WritablesTest { public void testTriples() throws Exception { Tuple3 j = Tuple3.of("a", "b", "c"); Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), }; - BytesWritable[] b = new BytesWritable[t.length]; - for (int i = 0; i < t.length; i++) { - b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); - } - TupleWritable w = new TupleWritable(b); - w.setWritten(0); - w.setWritten(1); - w.setWritten(2); + TupleWritable w = new TupleWritable(t); WritableType<?, ?> wt = Writables.triples(Writables.strings(), Writables.strings(), Writables.strings()); testInputOutputFn(wt, j, w); } @@ -170,15 +156,7 @@ public class WritablesTest { public void testQuads() throws Exception { Tuple4 j = Tuple4.of("a", "b", "c", "d"); Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), }; - BytesWritable[] b = new BytesWritable[t.length]; - for (int i = 0; i < t.length; i++) { - b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); - } - TupleWritable w = new TupleWritable(b); - w.setWritten(0); - w.setWritten(1); - w.setWritten(2); - w.setWritten(3); + TupleWritable w = new TupleWritable(t); WritableType<?, ?> wt = Writables.quads(Writables.strings(), Writables.strings(), Writables.strings(), Writables.strings()); testInputOutputFn(wt, j, w); @@ -189,16 +167,7 @@ public class WritablesTest { TupleN j = new TupleN("a", "b", "c", "d", "e"); Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), new Text("e"), }; - BytesWritable[] b = new BytesWritable[t.length]; - for (int i = 0; i < t.length; i++) { - b[i] = new BytesWritable(WritableUtils.toByteArray(t[i])); - } - TupleWritable w = new TupleWritable(b); - w.setWritten(0); - w.setWritten(1); - w.setWritten(2); - w.setWritten(3); - w.setWritten(4); + TupleWritable w = new TupleWritable(t); WritableType<?, ?> wt = Writables.tuples(Writables.strings(), Writables.strings(), Writables.strings(), Writables.strings(), Writables.strings()); testInputOutputFn(wt, j, w);
