Updated Branches: refs/heads/master bf5cea2fe -> 092ef5d01
CRUNCH-126: Add support for multiple input HBase tables. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/092ef5d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/092ef5d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/092ef5d0 Branch: refs/heads/master Commit: 092ef5d017470828cf6b1866885b7586762e114a Parents: bf5cea2 Author: Josh Wills <[email protected]> Authored: Thu Dec 6 11:44:14 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Dec 11 08:20:33 2012 -0800 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/WordCountHBaseIT.java | 45 +++++- .../apache/crunch/io/hbase/HBaseSourceTarget.java | 27 +++- .../org/apache/crunch/io/hbase/HBaseTarget.java | 7 +- .../crunch/impl/mr/run/CrunchInputFormat.java | 3 +- .../apache/crunch/impl/mr/run/CrunchInputs.java | 2 +- .../java/org/apache/crunch/io/InputBundle.java | 118 +++++++++++++++ .../org/apache/crunch/io/avro/AvroFileSource.java | 2 +- .../org/apache/crunch/io/impl/FileSourceImpl.java | 7 +- .../apache/crunch/io/impl/FileTableSourceImpl.java | 1 + .../org/apache/crunch/io/impl/InputBundle.java | 114 -------------- .../org/apache/crunch/io/text/NLineFileSource.java | 2 +- .../apache/crunch/io/text/TextFileTableSource.java | 2 +- 12 files changed, 193 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index 51abdaa..a46369e 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -26,12 +26,14 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.util.List; import java.util.Random; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; @@ -39,7 +41,6 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.hbase.HBaseSourceTarget; import org.apache.crunch.io.hbase.HBaseTarget; -import org.apache.crunch.lib.Aggregate; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.writable.Writables; @@ -64,9 +65,24 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; public class WordCountHBaseIT { + + static class StringifyFn extends MapFn<Pair<ImmutableBytesWritable, Pair<Result, Result>>, String> { + @Override + public String map(Pair<ImmutableBytesWritable, Pair<Result, Result>> input) { + byte[] firstStrBytes = input.second().first().getValue(WORD_COLFAM, null); + byte[] secondStrBytes = input.second().second().getValue(WORD_COLFAM, null); + if (firstStrBytes != null && secondStrBytes != null) { + return Joiner.on(',').join(new String(firstStrBytes), new String(secondStrBytes)); + } + return ""; + } + } + @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); @@ -77,7 +93,7 @@ public class WordCountHBaseIT { @SuppressWarnings("serial") public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) { - PTable<String, Long> counts = Aggregate.count(words.parallelDo( + PTable<String, Long> counts = words.parallelDo( new DoFn<Pair<ImmutableBytesWritable, Result>, String>() { @Override public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) { @@ -86,7 +102,7 @@ public class WordCountHBaseIT { emitter.emit(Bytes.toString(word)); } } - }, words.getTypeFamily().strings())); + }, words.getTypeFamily().strings()).count(); return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, Put>() { @Override @@ -200,7 +216,8 @@ public class WordCountHBaseIT { int postFix = Math.abs(rand.nextInt()); String inputTableName = "crunch_words_" + postFix; String outputTableName = "crunch_counts_" + postFix; - + String joinTableName = "crunch_join_words_" + postFix; + try { HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM); @@ -213,13 +230,29 @@ public class WordCountHBaseIT { Scan scan = new Scan(); scan.addColumn(WORD_COLFAM, null); HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); - PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source); - pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName)); + PTable<ImmutableBytesWritable, Result> words = pipeline.read(source); + pipeline.write(wordCount(words), new HBaseTarget(outputTableName)); pipeline.done(); assertIsLong(outputTable, "cat", 2); assertIsLong(outputTable, "dog", 1); + // verify we can do joins. + HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM); + key = 0; + key = put(joinTable, key, "zebra"); + key = put(joinTable, key, "donkey"); + key = put(joinTable, key, "bird"); + key = put(joinTable, key, "horse"); + + Scan joinScan = new Scan(); + joinScan.addColumn(WORD_COLFAM, null); + PTable<ImmutableBytesWritable, Result> other = pipeline.read(FromHBase.table(joinTableName, joinScan)); + PCollection<String> joined = words.join(other).parallelDo(new StringifyFn(), Writables.strings()); + assertEquals(ImmutableSet.of("cat,zebra", "cat,donkey", "dog,bird"), + ImmutableSet.copyOf(joined.materialize())); + pipeline.done(); + //verify HBaseTarget supports deletes. Scan clearScan = new Scan(); clearScan.addColumn(COUNTS_COLFAM, null); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index fcb9de1..8e6a3fb 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -25,11 +25,14 @@ import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.Pair; import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; +import org.apache.crunch.impl.mr.run.CrunchInputs; import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.io.InputBundle; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -46,10 +49,18 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair< Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class)); protected Scan scan; - + private InputBundle<TableInputFormat> inputBundle; + public HBaseSourceTarget(String table, Scan scan) { super(table); this.scan = scan; + try { + this.inputBundle = new InputBundle<TableInputFormat>(TableInputFormat.class) + .set(TableInputFormat.INPUT_TABLE, table) + .set(TableInputFormat.SCAN, convertScanToString(scan)); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override @@ -69,7 +80,7 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair< } HBaseSourceTarget o = (HBaseSourceTarget) other; // XXX scan does not have equals method - return table.equals(o.table) && scan.equals(o.scan); + return inputBundle.equals(o.inputBundle); } @Override @@ -85,12 +96,16 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair< @Override public void configureSource(Job job, int inputId) throws IOException { Configuration conf = job.getConfiguration(); - job.setInputFormatClass(TableInputFormat.class); - job.setMapperClass(CrunchMapper.class); HBaseConfiguration.addHbaseResources(conf); - conf.set(TableInputFormat.INPUT_TABLE, table); - conf.set(TableInputFormat.SCAN, convertScanToString(scan)); TableMapReduceUtil.addDependencyJars(job); + if (inputId == -1) { + job.setMapperClass(CrunchMapper.class); + job.setInputFormatClass(inputBundle.getInputFormatClass()); + inputBundle.configure(job.getConfiguration()); + } else { + Path dummy = new Path("/hbase/" + table); + CrunchInputs.addInputPath(job, dummy, inputBundle, inputId); + } } static String convertScanToString(Scan scan) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index 48593b8..b8b9c14 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -82,7 +82,8 @@ public class HBaseTarget implements MapReduceTarget { final Configuration conf = job.getConfiguration(); HBaseConfiguration.addHbaseResources(conf); conf.set(TableOutputFormat.OUTPUT_TABLE, table); - + Class<?> typeClass = ptype.getTypeClass(); // Either Put or Delete + try { TableMapReduceUtil.addDependencyJars(job); FileOutputFormat.setOutputPath(job, outputPath); @@ -93,12 +94,12 @@ public class HBaseTarget implements MapReduceTarget { if (null == name) { job.setOutputFormatClass(TableOutputFormat.class); job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Put.class); + job.setOutputValueClass(typeClass); } else { CrunchMultipleOutputs.addNamedOutput(job, name, TableOutputFormat.class, ImmutableBytesWritable.class, - Put.class); + typeClass); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java index 7e91bdd..bca7770 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import org.apache.crunch.io.impl.InputBundle; +import org.apache.crunch.io.InputBundle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; @@ -47,6 +47,7 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> { for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) { InputBundle inputBundle = entry.getKey(); Job jobCopy = new Job(conf); + inputBundle.configure(jobCopy.getConfiguration()); InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getInputFormatClass(), jobCopy.getConfiguration()); for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java index 93868fc..63eba61 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java @@ -20,7 +20,7 @@ package org.apache.crunch.impl.mr.run; import java.util.List; import java.util.Map; -import org.apache.crunch.io.impl.InputBundle; +import org.apache.crunch.io.InputBundle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/InputBundle.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/InputBundle.java b/crunch/src/main/java/org/apache/crunch/io/InputBundle.java new file mode 100644 index 0000000..ed737d7 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/InputBundle.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; + +import com.google.common.collect.Maps; + +/** + * A combination of an InputFormat and any configuration information that + * InputFormat needs to run properly. InputBundles allow us to let different + * InputFormats act as if they are the only InputFormat that exists in a + * particular MapReduce job. + */ +public class InputBundle<K extends InputFormat> implements Serializable { + + private Class<K> inputFormatClass; + private Map<String, String> extraConf; + + public static <T extends InputFormat> InputBundle<T> fromSerialized(String serialized) { + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized)); + try { + ObjectInputStream ois = new ObjectInputStream(bais); + InputBundle<T> bundle = (InputBundle<T>) ois.readObject(); + ois.close(); + return bundle; + } catch (IOException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public static <T extends InputFormat> InputBundle<T> of(Class<T> inputFormatClass) { + return new InputBundle<T>(inputFormatClass); + } + + public InputBundle(Class<K> inputFormatClass) { + this.inputFormatClass = inputFormatClass; + this.extraConf = Maps.newHashMap(); + } + + public InputBundle<K> set(String key, String value) { + this.extraConf.put(key, value); + return this; + } + + public Class<K> getInputFormatClass() { + return inputFormatClass; + } + + public Map<String, String> getExtraConfiguration() { + return extraConf; + } + + public Configuration configure(Configuration conf) { + for (Map.Entry<String, String> e : extraConf.entrySet()) { + conf.set(e.getKey(), e.getValue()); + } + return conf; + } + + public String serialize() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(this); + oos.close(); + return Base64.encodeBase64String(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public String getName() { + return inputFormatClass.getSimpleName(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof InputBundle)) { + return false; + } + InputBundle<K> oib = (InputBundle<K>) other; + return inputFormatClass.equals(oib.inputFormatClass) && extraConf.equals(oib.extraConf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 32b8054..0e9a6ee 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.avro.mapred.AvroJob; import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.InputBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; -import org.apache.crunch.io.impl.InputBundle; import org.apache.crunch.types.avro.AvroInputFormat; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index d3e9c6f..4038b60 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.Source; import org.apache.crunch.impl.mr.run.CrunchInputs; +import org.apache.crunch.io.InputBundle; import org.apache.crunch.io.SourceTargetHelper; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; @@ -38,15 +39,15 @@ public abstract class FileSourceImpl<T> implements Source<T> { protected final Path path; protected final PType<T> ptype; - protected final InputBundle inputBundle; + protected final InputBundle<?> inputBundle; public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) { this.path = path; this.ptype = ptype; - this.inputBundle = new InputBundle(inputFormatClass); + this.inputBundle = InputBundle.of(inputFormatClass); } - public FileSourceImpl(Path path, PType<T> ptype, InputBundle inputBundle) { + public FileSourceImpl(Path path, PType<T> ptype, InputBundle<?> inputBundle) { this.path = path; this.ptype = ptype; this.inputBundle = inputBundle; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java index c7ea767..7d63cc0 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java @@ -19,6 +19,7 @@ package org.apache.crunch.io.impl; import org.apache.crunch.Pair; import org.apache.crunch.TableSource; +import org.apache.crunch.io.InputBundle; import org.apache.crunch.types.PTableType; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java b/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java deleted file mode 100644 index f92e70a..0000000 --- a/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.impl; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.Map; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputFormat; - -import com.google.common.collect.Maps; - -/** - * A combination of an InputFormat and any configuration information that - * InputFormat needs to run properly. InputBundles allow us to let different - * InputFormats pretend as if they are the only InputFormat that exists in a - * particular MapReduce job. - */ -public class InputBundle implements Serializable { - - private Class<? extends InputFormat> inputFormatClass; - private Map<String, String> extraConf; - - public static InputBundle fromSerialized(String serialized) { - ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized)); - try { - ObjectInputStream ois = new ObjectInputStream(bais); - InputBundle bundle = (InputBundle) ois.readObject(); - ois.close(); - return bundle; - } catch (IOException e) { - throw new RuntimeException(e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - public InputBundle(Class<? extends InputFormat> inputFormatClass) { - this.inputFormatClass = inputFormatClass; - this.extraConf = Maps.newHashMap(); - } - - public InputBundle set(String key, String value) { - this.extraConf.put(key, value); - return this; - } - - public Class<? extends InputFormat> getInputFormatClass() { - return inputFormatClass; - } - - public Map<String, String> getExtraConfiguration() { - return extraConf; - } - - public Configuration configure(Configuration conf) { - for (Map.Entry<String, String> e : extraConf.entrySet()) { - conf.set(e.getKey(), e.getValue()); - } - return conf; - } - - public String serialize() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(this); - oos.close(); - return Base64.encodeBase64String(baos.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public String getName() { - return inputFormatClass.getSimpleName(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof InputBundle)) { - return false; - } - InputBundle oib = (InputBundle) other; - return inputFormatClass.equals(oib.inputFormatClass) && extraConf.equals(oib.extraConf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java index d88ef4a..ad3414a 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java @@ -20,9 +20,9 @@ package org.apache.crunch.io.text; import java.io.IOException; import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.InputBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileSourceImpl; -import org.apache.crunch.io.impl.InputBundle; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java index c94676a..23cda77 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.crunch.Pair; import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.InputBundle; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.impl.FileTableSourceImpl; -import org.apache.crunch.io.impl.InputBundle; import org.apache.crunch.types.PTableType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
