Updated Branches: refs/heads/master 2aa692e52 -> a724ddce5
CRUNCH-298: Slim down FormatBundle serialization Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a724ddce Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a724ddce Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a724ddce Branch: refs/heads/master Commit: a724ddce5686270782cad031b071f74084ccb534 Parents: 2aa692e Author: Josh Wills <[email protected]> Authored: Tue Nov 19 17:44:21 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Nov 19 17:44:21 2013 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/io/CrunchInputs.java | 2 +- .../java/org/apache/crunch/io/CrunchOutputs.java | 3 +-- .../java/org/apache/crunch/io/FormatBundle.java | 17 ++++++++--------- 3 files changed, 10 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a724ddce/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java index d154db2..c1a0eef 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java @@ -55,7 +55,7 @@ public class CrunchInputs { Configuration conf = job.getConfiguration(); for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) { List<String> fields = Lists.newArrayList(SPLITTER.split(input)); - FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), InputFormat.class); + FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), job.getConfiguration()); if (!formatNodeMap.containsKey(inputBundle)) { formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a724ddce/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index ccf4fb5..cd1ebce 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -83,8 +83,7 @@ public class CrunchOutputs<K, V> { for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) { List<String> fields = Lists.newArrayList(SPLITTER.split(input)); String name = fields.get(0); - FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), - OutputFormat.class); + FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf); try { Class<?> keyClass = Class.forName(fields.get(2)); Class<?> valueClass = Class.forName(fields.get(3)); http://git-wip-us.apache.org/repos/asf/crunch/blob/a724ddce/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java index aa84fee..3259aaf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java @@ -20,7 +20,9 @@ package org.apache.crunch.io; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -52,17 +54,15 @@ public class FormatBundle<K> implements Serializable, Writable, Configurable { private Map<String, String> extraConf; private Configuration conf; - public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T> clazz) { + public static <T> FormatBundle<T> fromSerialized(String serialized, Configuration conf) { ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized)); try { - ObjectInputStream ois = new ObjectInputStream(bais); - FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject(); - ois.close(); + FormatBundle<T> bundle = new FormatBundle<T>(); + bundle.setConf(conf); + bundle.readFields(new DataInputStream(bais)); return bundle; } catch (IOException e) { throw new RuntimeException(e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); } } @@ -102,9 +102,8 @@ public class FormatBundle<K> implements Serializable, Writable, Configurable { public String serialize() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(this); - oos.close(); + DataOutputStream dos = new DataOutputStream(baos); + write(dos); return Base64.encodeBase64String(baos.toByteArray()); } catch (IOException e) { throw new RuntimeException(e);
