Updated Branches: refs/heads/master 6cc8d8ec8 -> 855b0ddcb
CRUNCH-167: Re-write the sorting strategy for tuples to only select the fields that we are using to sort on as the keys, and re-implement the wrapper functions to compress all of the different tuple sort methods (Pairs, Trips, etc.) into a single method. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/855b0ddc Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/855b0ddc Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/855b0ddc Branch: refs/heads/master Commit: 855b0ddcb738611abfd9bed445403f18b63d2453 Parents: 6cc8d8e Author: Josh Wills <[email protected]> Authored: Thu Feb 21 16:35:49 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Sun Feb 24 13:02:55 2013 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/SortByValueIT.java | 81 ++++ .../src/it/java/org/apache/crunch/lib/SortIT.java | 16 +- crunch/src/it/resources/sort_by_value.txt | 5 + .../src/main/java/org/apache/crunch/lib/Sort.java | 353 ++++++++------- 4 files changed, 278 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java new file mode 100644 index 0000000..c313351 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/SortByValueIT.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.lib.Sort.ColumnOrder; +import org.apache.crunch.lib.Sort.Order; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +/** + * + */ +public class SortByValueIT { + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + private static class SplitFn extends MapFn<String, Pair<String, Long>> { + private String sep; + + public SplitFn(String sep) { + this.sep = sep; + } + + @Override + public Pair<String, Long> map(String input) { + String[] pieces = input.split(sep); + return Pair.of(pieces[0], Long.valueOf(pieces[1])); + } + } + + @Test + public void testSortByValueWritables() throws Exception { + run(new MRPipeline(SortByValueIT.class), WritableTypeFamily.getInstance()); + } + + @Test + public void testSortByValueAvro() throws Exception { + run(new MRPipeline(SortByValueIT.class), AvroTypeFamily.getInstance()); + } + + public void run(Pipeline pipeline, PTypeFamily ptf) throws Exception { + String sbv = tmpDir.copyResourceFileName("sort_by_value.txt"); + PTable<String, Long> letterCounts = pipeline.read(From.textFile(sbv)).parallelDo(new SplitFn("\t"), + ptf.tableOf(ptf.strings(), ptf.longs())); + PCollection<Pair<String, Long>> sorted = Sort.sortPairs(letterCounts, new ColumnOrder(2, Order.DESCENDING)); + assertEquals( + ImmutableList.of(Pair.of("C", 3L), Pair.of("A", 2L), Pair.of("D", 2L), Pair.of("B", 1L), Pair.of("E", 1L)), + ImmutableList.copyOf(sorted.materialize())); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/java/org/apache/crunch/lib/SortIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java index 3ea31ca..bad4864 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java @@ -49,7 +49,6 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -78,7 +77,7 @@ public class SortIT implements Serializable { } @Test - public void testWritableSortSecondDescFirstDesc() throws Exception { + public void testWritableSortSecondDescFirstAsc() throws Exception { runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text"); } @@ -117,14 +116,13 @@ public class SortIT implements Serializable { } @Test - public void testAvroSortPairAscAsc() throws Exception { + public void testAvroSortPairAscDesc() throws Exception { runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text"); } @Test - @Ignore("Avro sorting only works in field order at the moment") - public void testAvroSortPairSecondAscFirstDesc() throws Exception { + public void testAvroSortPairSecondDescFirstAsc() throws Exception { runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text"); } @@ -220,15 +218,15 @@ public class SortIT implements Serializable { String inputPath = tmpDir.copyResourceFileName("docs.txt"); PCollection<String> input = pipeline.readTextFile(inputPath); - PCollection<Pair<String, String>> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() { + PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() { @Override public void process(String input, Emitter<Pair<String, String>> emitter) { String[] split = input.split("[\t]+"); emitter.emit(Pair.of(split[0], split[1])); } - }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings())); + }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings())); PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second); - Iterable<Pair<String, String>> lines = sorted.materialize(); + List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize()); Pair<String, String> l = lines.iterator().next(); assertEquals(firstField, l.first()); assertEquals(secondField, l.second()); @@ -250,7 +248,7 @@ public class SortIT implements Serializable { } }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings())); PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third); - Iterable<Tuple3<String, String, String>> lines = sorted.materialize(); + List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize()); Tuple3<String, String, String> l = lines.iterator().next(); assertEquals(firstField, l.first()); assertEquals(secondField, l.second()); http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/it/resources/sort_by_value.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/sort_by_value.txt b/crunch/src/it/resources/sort_by_value.txt new file mode 100644 index 0000000..73f7d11 --- /dev/null +++ b/crunch/src/it/resources/sort_by_value.txt @@ -0,0 +1,5 @@ +A 2 +B 1 +C 3 +D 2 +E 1 http://git-wip-us.apache.org/repos/asf/crunch/blob/855b0ddc/crunch/src/main/java/org/apache/crunch/lib/Sort.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java index f2729a2..cca5a79 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Sort.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java @@ -18,29 +18,33 @@ package org.apache.crunch.lib; import java.util.Arrays; -import java.util.BitSet; import java.util.List; import java.util.UUID; import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryData; import org.apache.avro.reflect.ReflectData; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; import org.apache.crunch.GroupingOptions.Builder; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.TupleFactory; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.TupleWritable; import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.hadoop.conf.Configurable; @@ -54,7 +58,6 @@ import org.apache.hadoop.mapred.JobConf; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -120,13 +123,7 @@ public class Sort { emitter.emit(Pair.of(input, (Void) null)); } }, type); - PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup(); - return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() { - @Override - public void process(Pair<T, Void> input, Emitter<T> emitter) { - emitter.emit(input.first()); - } - }, collection.getPType()); + return pt.groupByKey(options).ungroup().keys(); } /** @@ -151,6 +148,156 @@ public class Sort { return table.groupByKey(options).ungroup(); } + static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> { + private final int index; + + public SingleKeyFn(int index) { + this.index = index; + } + + @Override + public K map(V input) { + return (K) input.get(index); + } + } + + static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> { + private final int[] indices; + private final TupleFactory tupleFactory; + + public TupleKeyFn(int[] indices, TupleFactory tupleFactory) { + this.indices = indices; + this.tupleFactory = tupleFactory; + } + + @Override + public K map(V input) { + Object[] values = new Object[indices.length]; + for (int i = 0; i < indices.length; i++) { + values[i] = input.get(indices[i]); + } + return (K) tupleFactory.makeTuple(values); + } + } + + static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> { + + private final int[] indices; + private final String schemaJson; + private transient Schema schema; + + public AvroGenericFn(int[] indices, Schema schema) { + this.indices = indices; + this.schemaJson = schema.toString(); + } + + @Override + public void initialize() { + this.schema = (new Schema.Parser()).parse(schemaJson); + } + + @Override + public GenericRecord map(V input) { + GenericRecord rec = new GenericData.Record(schema); + for (int i = 0; i < indices.length; i++) { + rec.put(i, input.get(indices[i])); + } + return rec; + } + } + + static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) { + // Guarantee each tuple schema has a globally unique name + String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x'); + Schema schema = Schema.createRecord(tupleName, "", "crunch", false); + List<Schema.Field> fields = Lists.newArrayList(); + AvroType<S> parentAvroType = (AvroType<S>) ptype; + Schema parentAvroSchema = parentAvroType.getSchema(); + + for (int index = 0; index < orders.length; index++) { + ColumnOrder columnOrder = orders[index]; + AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index); + Schema fieldSchema = atype.getSchema(); + String fieldName = parentAvroSchema.getFields().get(index).name(); + // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable + // Text instances: making this consistent + Schema.Field.Order order = columnOrder.order == Order.DESCENDING ? Schema.Field.Order.DESCENDING : + Schema.Field.Order.ASCENDING; + fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order)); + } + schema.setFields(fields); + return schema; + } + + static class KeyExtraction<V extends Tuple> { + + private PType<V> ptype; + private final ColumnOrder[] columnOrder; + private final int[] cols; + + private MapFn<V, Object> byFn; + private PType<Object> keyPType; + + public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) { + this.ptype = ptype; + this.columnOrder = columnOrder; + this.cols = new int[columnOrder.length]; + for (int i = 0; i < columnOrder.length; i++) { + cols[i] = columnOrder[i].column - 1; + } + init(); + } + + private void init() { + List<PType> pt = ptype.getSubTypes(); + PTypeFamily ptf = ptype.getFamily(); + if (cols.length == 1) { + byFn = new SingleKeyFn(cols[0]); + keyPType = pt.get(cols[0]); + } else { + TupleFactory tf = null; + switch (cols.length) { + case 2: + tf = TupleFactory.PAIR; + keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1])); + break; + case 3: + tf = TupleFactory.TUPLE3; + keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2])); + break; + case 4: + tf = TupleFactory.TUPLE4; + keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3])); + break; + default: + PType[] pts = new PType[cols.length]; + for (int i = 0; i < pts.length; i++) { + pts[i] = pt.get(cols[i]); + } + tf = TupleFactory.TUPLEN; + keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts); + } + + if (ptf == AvroTypeFamily.getInstance()) { + Schema s = createOrderedTupleSchema(keyPType, columnOrder); + keyPType = (PType<Object>) (PType<?>) Avros.generics(s); + byFn = new AvroGenericFn(cols, s); + } else { + byFn = new TupleKeyFn(cols, tf); + } + } + + } + + public MapFn<V, Object> getByFn() { + return byFn; + } + + public PType<Object> getKeyType() { + return keyPType; + } + } + /** * Sorts the {@link PCollection} of {@link Pair}s using the specified column * ordering. @@ -159,28 +306,7 @@ public class Sort { */ public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) { - // put U and V into a pair/tuple in the key so we can do grouping and - // sorting - PTypeFamily tf = collection.getTypeFamily(); - PType<Pair<U, V>> pType = collection.getPType(); - @SuppressWarnings("unchecked") - PTableType<Pair<U, V>, Void> type = tf.tableOf(tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)), - tf.nulls()); - PTable<Pair<U, V>, Void> pt = collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() { - @Override - public void process(Pair<U, V> input, Emitter<Pair<Pair<U, V>, Void>> emitter) { - emitter.emit(Pair.of(input, (Void) null)); - } - }, type); - Configuration conf = collection.getPipeline().getConfiguration(); - GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders); - PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup(); - return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>, Void>, Pair<U, V>>() { - @Override - public void process(Pair<Pair<U, V>, Void> input, Emitter<Pair<U, V>> emitter) { - emitter.emit(input.first()); - } - }, collection.getPType()); + return sortTuples(collection, columnOrders); } /** @@ -191,27 +317,7 @@ public class Sort { */ public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) { - PTypeFamily tf = collection.getTypeFamily(); - PType<Tuple3<V1, V2, V3>> pType = collection.getPType(); - @SuppressWarnings("unchecked") - PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf( - tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)), tf.nulls()); - PTable<Tuple3<V1, V2, V3>, Void> pt = collection.parallelDo( - new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() { - @Override - public void process(Tuple3<V1, V2, V3> input, Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) { - emitter.emit(Pair.of(input, (Void) null)); - } - }, type); - Configuration conf = collection.getPipeline().getConfiguration(); - GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders); - PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup(); - return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>, Void>, Tuple3<V1, V2, V3>>() { - @Override - public void process(Pair<Tuple3<V1, V2, V3>, Void> input, Emitter<Tuple3<V1, V2, V3>> emitter) { - emitter.emit(input.first()); - } - }, collection.getPType()); + return sortTuples(collection, columnOrders); } /** @@ -222,27 +328,7 @@ public class Sort { */ public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads( PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) { - PTypeFamily tf = collection.getTypeFamily(); - PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType(); - @SuppressWarnings("unchecked") - PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(tf.quads(pType.getSubTypes().get(0), pType.getSubTypes() - .get(1), pType.getSubTypes().get(2), pType.getSubTypes().get(3)), tf.nulls()); - PTable<Tuple4<V1, V2, V3, V4>, Void> pt = collection.parallelDo( - new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() { - @Override - public void process(Tuple4<V1, V2, V3, V4> input, Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) { - emitter.emit(Pair.of(input, (Void) null)); - } - }, type); - Configuration conf = collection.getPipeline().getConfiguration(); - GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders); - PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup(); - return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>, Void>, Tuple4<V1, V2, V3, V4>>() { - @Override - public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input, Emitter<Tuple4<V1, V2, V3, V4>> emitter) { - emitter.emit(input.first()); - } - }, collection.getPType()); + return sortTuples(collection, columnOrders); } /** @@ -251,25 +337,14 @@ public class Sort { * * @return a {@link PCollection} representing the sorted collection. */ - public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection, ColumnOrder... columnOrders) { + public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) { PTypeFamily tf = collection.getTypeFamily(); - PType<TupleN> pType = collection.getPType(); - PTableType<TupleN, Void> type = tf.tableOf(tf.tuples(pType.getSubTypes().toArray(new PType[0])), tf.nulls()); - PTable<TupleN, Void> pt = collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() { - @Override - public void process(TupleN input, Emitter<Pair<TupleN, Void>> emitter) { - emitter.emit(Pair.of(input, (Void) null)); - } - }, type); + PType<T> pType = collection.getPType(); + KeyExtraction<T> ke = new KeyExtraction<T>(pType, columnOrders); + PTable<Object, T> pt = collection.by(ke.getByFn(), ke.getKeyType()); Configuration conf = collection.getPipeline().getConfiguration(); - GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders); - PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup(); - return sortedPt.parallelDo(new DoFn<Pair<TupleN, Void>, TupleN>() { - @Override - public void process(Pair<TupleN, Void> input, Emitter<TupleN> emitter) { - emitter.emit(input.first()); - } - }, collection.getPType()); + GroupingOptions options = buildGroupingOptions(conf, tf, ke.getKeyType(), pType, columnOrders); + return pt.groupByKey(options).ungroup().values(); } // TODO: move to type family? @@ -294,15 +369,23 @@ public class Sort { return builder.build(); } - private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype, - ColumnOrder[] columnOrders) { + private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> keyType, + PType<?> valueType, ColumnOrder[] columnOrders) { Builder builder = GroupingOptions.builder(); if (tf == WritableTypeFamily.getInstance()) { - TupleWritableComparator.configureOrdering(conf, columnOrders); - builder.sortComparatorClass(TupleWritableComparator.class); + if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) { + builder.sortComparatorClass(ReverseWritableComparator.class); + } else { + TupleWritableComparator.configureOrdering(conf, columnOrders); + builder.sortComparatorClass(TupleWritableComparator.class); + } } else if (tf == AvroTypeFamily.getInstance()) { - TupleAvroComparator.configureOrdering(conf, columnOrders, ptype); - builder.sortComparatorClass(TupleAvroComparator.class); + if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) { + AvroType<T> avroType = (AvroType<T>) keyType; + Schema schema = avroType.getSchema(); + conf.set("crunch.schema", schema.toString()); + builder.sortComparatorClass(ReverseAvroComparator.class); + } } else { throw new RuntimeException("Unrecognized type family: " + tf); } @@ -340,7 +423,7 @@ public class Sort { static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> { - Schema schema; + private Schema schema; @Override public void setConf(Configuration conf) { @@ -397,12 +480,11 @@ public class Sort { public int compare(WritableComparable a, WritableComparable b) { TupleWritable ta = (TupleWritable) a; TupleWritable tb = (TupleWritable) b; - for (int i = 0; i < columnOrders.length; i++) { - int index = columnOrders[i].column - 1; + for (int index = 0; index < columnOrders.length; index++) { int order = 1; - if (columnOrders[i].order == Order.ASCENDING) { + if (columnOrders[index].order == Order.ASCENDING) { order = 1; - } else if (columnOrders[i].order == Order.DESCENDING) { + } else if (columnOrders[index].order == Order.DESCENDING) { order = -1; } else { // ignore continue; @@ -431,7 +513,7 @@ public class Sort { } } } - return 0; // ordering using specified columns found no differences + return 0; // ordering using specified cols found no differences } @Override @@ -457,69 +539,4 @@ public class Sort { } } - static class TupleAvroComparator<T> extends Configured implements RawComparator<T> { - - Schema schema; - - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - if (conf != null) { - schema = (new Schema.Parser()).parse(conf.get("crunch.schema")); - } - } - - public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders, PType<S> ptype) { - Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders); - conf.set("crunch.schema", orderedSchema.toString()); - } - - // TODO: move to Avros - // TODO: need to re-order columns in map output then switch back in the - // reduce - // this will require more extensive changes in Crunch - private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) { - // Guarantee each tuple schema has a globally unique name - String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x'); - Schema schema = Schema.createRecord(tupleName, "", "crunch", false); - List<Schema.Field> fields = Lists.newArrayList(); - AvroType<S> parentAvroType = (AvroType<S>) ptype; - Schema parentAvroSchema = parentAvroType.getSchema(); - - BitSet orderedColumns = new BitSet(); - // First add any fields specified by ColumnOrder - for (ColumnOrder columnOrder : orders) { - int index = columnOrder.column - 1; - AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index); - Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL))); - String fieldName = parentAvroSchema.getFields().get(index).name(); - fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.valueOf(columnOrder.order - .name()))); - orderedColumns.set(index); - } - // Then add remaining fields from the ptypes, with no sort order - for (int i = 0; i < ptype.getSubTypes().size(); i++) { - if (orderedColumns.get(i)) { - continue; - } - AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i); - Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL))); - String fieldName = parentAvroSchema.getFields().get(i).name(); - fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.IGNORE)); - } - schema.setFields(fields); - return schema; - } - - @Override - public int compare(T o1, T o2) { - return ReflectData.get().compare(o1, o2, schema); - } - - @Override - public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { - return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema); - } - - } }
