Repository: crunch Updated Branches: refs/heads/master 3477ea431 -> 85c2642ca
CRUNCH-486: Properly configure Writable serialization codes when sorting using TupleWritable keys. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/85c2642c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/85c2642c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/85c2642c Branch: refs/heads/master Commit: 85c2642ca6a759a5d53fc725594ec336edeb0f49 Parents: 3477ea4 Author: Josh Wills <[email protected]> Authored: Fri Jan 9 13:46:12 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Jan 12 10:30:10 2015 -0800 ---------------------------------------------------------------------- .../crunch/types/writable/TupleWritable.java | 22 +++++++++++++++++--- .../writable/WritableGroupedTableType.java | 4 ++++ .../apache/crunch/types/writable/Writables.java | 7 +++++-- 3 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/85c2642c/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java index 12b2fb9..bdd3ad9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java @@ -25,6 +25,7 @@ import java.util.Arrays; import com.google.common.base.Preconditions; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.CrunchRuntimeException; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.DataInputBuffer; @@ -255,7 +256,7 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl return this.size() - that.size(); } - public static class Comparator extends WritableComparator { + public static class Comparator extends WritableComparator implements Configurable { private static final Comparator INSTANCE = new Comparator(); @@ -263,11 +264,26 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl return INSTANCE; } - private Comparator() { + public Comparator() { super(TupleWritable.class); } @Override + public void setConf(Configuration conf) { + if (conf == null) return; + try { + Writables.reloadWritableComparableCodes(conf); + } catch (Exception e) { + throw new CrunchRuntimeException("Error reloading writable comparable codes", e); + } + } + + @Override + public Configuration getConf() { + return null; + } + + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { DataInputBuffer buffer1 = new DataInputBuffer(); DataInputBuffer buffer2 = new DataInputBuffer(); @@ -342,4 +358,4 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl // without any deserialization overhead. WritableComparator.define(TupleWritable.class, Comparator.getInstance()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/crunch/blob/85c2642c/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java index 8823541..3167591 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java @@ -81,5 +81,9 @@ class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> { WritableType valueType = (WritableType) tableType.getValueType(); job.setMapOutputKeyClass(keyType.getSerializationClass()); job.setMapOutputValueClass(valueType.getSerializationClass()); + if (options.getSortComparatorClass() == null && + TupleWritable.class.equals(keyType.getSerializationClass())) { + job.setSortComparatorClass(TupleWritable.Comparator.class); + } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/85c2642c/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java index 5b5411b..23775ed 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -119,8 +119,11 @@ public class Writables { * @param code The unique registration code for the class, which must be greater than or equal to 8 */ public static void registerComparable(Class<? extends WritableComparable> clazz, int code) { - if (WRITABLE_CODES.containsKey(code)) { - throw new IllegalArgumentException("Already have writable class assigned to code = " + code); + if (WRITABLE_CODES.containsKey(code) && !clazz.equals(WRITABLE_CODES.get(code))) { + throw new IllegalArgumentException(String.format( + "Already have writable class %s assigned to code = %d", + clazz, + code)); } WRITABLE_CODES.put(code, clazz); }
