Updated Branches:
  refs/heads/master bf5cea2fe -> 092ef5d01

CRUNCH-126: Add support for multiple input HBase tables.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/092ef5d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/092ef5d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/092ef5d0

Branch: refs/heads/master
Commit: 092ef5d017470828cf6b1866885b7586762e114a
Parents: bf5cea2
Author: Josh Wills <[email protected]>
Authored: Thu Dec 6 11:44:14 2012 -0800
Committer: Josh Wills <[email protected]>
Committed: Tue Dec 11 08:20:33 2012 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/WordCountHBaseIT.java   |   45 +++++-
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |   27 +++-
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |    7 +-
 .../crunch/impl/mr/run/CrunchInputFormat.java      |    3 +-
 .../apache/crunch/impl/mr/run/CrunchInputs.java    |    2 +-
 .../java/org/apache/crunch/io/InputBundle.java     |  118 +++++++++++++++
 .../org/apache/crunch/io/avro/AvroFileSource.java  |    2 +-
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |    7 +-
 .../apache/crunch/io/impl/FileTableSourceImpl.java |    1 +
 .../org/apache/crunch/io/impl/InputBundle.java     |  114 --------------
 .../org/apache/crunch/io/text/NLineFileSource.java |    2 +-
 .../apache/crunch/io/text/TextFileTableSource.java |    2 +-
 12 files changed, 193 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git 
a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java 
b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 51abdaa..a46369e 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -26,12 +26,14 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.List;
 import java.util.Random;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
@@ -39,7 +41,6 @@ import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.hbase.HBaseSourceTarget;
 import org.apache.crunch.io.hbase.HBaseTarget;
-import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.writable.Writables;
@@ -64,9 +65,24 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.io.ByteStreams;
 
 public class WordCountHBaseIT {
+
+  static class StringifyFn extends MapFn<Pair<ImmutableBytesWritable, 
Pair<Result, Result>>, String> {
+    @Override
+    public String map(Pair<ImmutableBytesWritable, Pair<Result, Result>> 
input) {
+      byte[] firstStrBytes = input.second().first().getValue(WORD_COLFAM, 
null);
+      byte[] secondStrBytes = input.second().second().getValue(WORD_COLFAM, 
null);
+      if (firstStrBytes != null && secondStrBytes != null) {
+        return Joiner.on(',').join(new String(firstStrBytes), new 
String(secondStrBytes));
+      }
+      return "";
+    }
+  }
+
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
 
@@ -77,7 +93,7 @@ public class WordCountHBaseIT {
 
   @SuppressWarnings("serial")
   public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, 
Result> words) {
-    PTable<String, Long> counts = Aggregate.count(words.parallelDo(
+    PTable<String, Long> counts = words.parallelDo(
         new DoFn<Pair<ImmutableBytesWritable, Result>, String>() {
           @Override
           public void process(Pair<ImmutableBytesWritable, Result> row, 
Emitter<String> emitter) {
@@ -86,7 +102,7 @@ public class WordCountHBaseIT {
               emitter.emit(Bytes.toString(word));
             }
           }
-        }, words.getTypeFamily().strings()));
+        }, words.getTypeFamily().strings()).count();
 
     return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, 
Put>() {
       @Override
@@ -200,7 +216,8 @@ public class WordCountHBaseIT {
     int postFix = Math.abs(rand.nextInt());
     String inputTableName = "crunch_words_" + postFix;
     String outputTableName = "crunch_counts_" + postFix;
-
+    String joinTableName = "crunch_join_words_" + postFix;
+    
     try {
 
       HTable inputTable = 
hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
@@ -213,13 +230,29 @@ public class WordCountHBaseIT {
       Scan scan = new Scan();
       scan.addColumn(WORD_COLFAM, null);
       HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
-      PTable<ImmutableBytesWritable, Result> shakespeare = 
pipeline.read(source);
-      pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName));
+      PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
+      pipeline.write(wordCount(words), new HBaseTarget(outputTableName));
       pipeline.done();
 
       assertIsLong(outputTable, "cat", 2);
       assertIsLong(outputTable, "dog", 1);
       
+      // verify we can do joins.
+      HTable joinTable = 
hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);
+      key = 0;
+      key = put(joinTable, key, "zebra");
+      key = put(joinTable, key, "donkey");
+      key = put(joinTable, key, "bird");
+      key = put(joinTable, key, "horse");
+      
+      Scan joinScan = new Scan();
+      joinScan.addColumn(WORD_COLFAM, null);
+      PTable<ImmutableBytesWritable, Result> other = 
pipeline.read(FromHBase.table(joinTableName, joinScan));
+      PCollection<String> joined = words.join(other).parallelDo(new 
StringifyFn(), Writables.strings());
+      assertEquals(ImmutableSet.of("cat,zebra", "cat,donkey", "dog,bird"),
+          ImmutableSet.copyOf(joined.materialize()));
+      pipeline.done();
+
       //verify HBaseTarget supports deletes.
       Scan clearScan = new Scan();
       clearScan.addColumn(COUNTS_COLFAM, null);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git 
a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java 
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index fcb9de1..8e6a3fb 100644
--- 
a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ 
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -25,11 +25,14 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchInputs;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.io.InputBundle;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -46,10 +49,18 @@ public class HBaseSourceTarget extends HBaseTarget 
implements SourceTarget<Pair<
       Writables.writables(ImmutableBytesWritable.class), 
Writables.writables(Result.class));
 
   protected Scan scan;
-
+  private InputBundle<TableInputFormat> inputBundle;
+  
   public HBaseSourceTarget(String table, Scan scan) {
     super(table);
     this.scan = scan;
+    try {
+      this.inputBundle = new 
InputBundle<TableInputFormat>(TableInputFormat.class)
+          .set(TableInputFormat.INPUT_TABLE, table)
+          .set(TableInputFormat.SCAN, convertScanToString(scan));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -69,7 +80,7 @@ public class HBaseSourceTarget extends HBaseTarget implements 
SourceTarget<Pair<
     }
     HBaseSourceTarget o = (HBaseSourceTarget) other;
     // XXX scan does not have equals method
-    return table.equals(o.table) && scan.equals(o.scan);
+    return inputBundle.equals(o.inputBundle);
   }
 
   @Override
@@ -85,12 +96,16 @@ public class HBaseSourceTarget extends HBaseTarget 
implements SourceTarget<Pair<
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
     Configuration conf = job.getConfiguration();
-    job.setInputFormatClass(TableInputFormat.class);
-    job.setMapperClass(CrunchMapper.class);
     HBaseConfiguration.addHbaseResources(conf);
-    conf.set(TableInputFormat.INPUT_TABLE, table);
-    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
     TableMapReduceUtil.addDependencyJars(job);
+    if (inputId == -1) {
+      job.setMapperClass(CrunchMapper.class);
+      job.setInputFormatClass(inputBundle.getInputFormatClass());
+      inputBundle.configure(job.getConfiguration());
+    } else {
+      Path dummy = new Path("/hbase/" + table);
+      CrunchInputs.addInputPath(job, dummy, inputBundle, inputId);
+    }
   }
 
   static String convertScanToString(Scan scan) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git 
a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java 
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 48593b8..b8b9c14 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -82,7 +82,8 @@ public class HBaseTarget implements MapReduceTarget {
     final Configuration conf = job.getConfiguration();
     HBaseConfiguration.addHbaseResources(conf);
     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
-
+    Class<?> typeClass = ptype.getTypeClass(); // Either Put or Delete
+    
     try {
       TableMapReduceUtil.addDependencyJars(job);
       FileOutputFormat.setOutputPath(job, outputPath);
@@ -93,12 +94,12 @@ public class HBaseTarget implements MapReduceTarget {
     if (null == name) {
       job.setOutputFormatClass(TableOutputFormat.class);
       job.setOutputKeyClass(ImmutableBytesWritable.class);
-      job.setOutputValueClass(Put.class);
+      job.setOutputValueClass(typeClass);
     } else {
       CrunchMultipleOutputs.addNamedOutput(job, name,
           TableOutputFormat.class,
           ImmutableBytesWritable.class,
-          Put.class);
+          typeClass);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
index 7e91bdd..bca7770 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.io.InputBundle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -47,6 +47,7 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, 
V> {
     for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : 
formatNodeMap.entrySet()) {
       InputBundle inputBundle = entry.getKey();
       Job jobCopy = new Job(conf);
+      inputBundle.configure(jobCopy.getConfiguration());
       InputFormat<?, ?> format = (InputFormat<?, ?>) 
ReflectionUtils.newInstance(inputBundle.getInputFormatClass(),
           jobCopy.getConfiguration());
       for (Map.Entry<Integer, List<Path>> nodeEntry : 
entry.getValue().entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
index 93868fc..63eba61 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
@@ -20,7 +20,7 @@ package org.apache.crunch.impl.mr.run;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.io.InputBundle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/InputBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/InputBundle.java 
b/crunch/src/main/java/org/apache/crunch/io/InputBundle.java
new file mode 100644
index 0000000..ed737d7
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/InputBundle.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A combination of an InputFormat and any configuration information that
+ * InputFormat needs to run properly. InputBundles allow us to let different
+ * InputFormats act as if they are the only InputFormat that exists in a
+ * particular MapReduce job.
+ */
+public class InputBundle<K extends InputFormat> implements Serializable {
+
+  private Class<K> inputFormatClass;
+  private Map<String, String> extraConf;
+
+  public static <T extends InputFormat> InputBundle<T> fromSerialized(String 
serialized) {
+    ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.decodeBase64(serialized));
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      InputBundle<T> bundle = (InputBundle<T>) ois.readObject();
+      ois.close();
+      return bundle;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static <T extends InputFormat> InputBundle<T> of(Class<T> 
inputFormatClass) {
+    return new InputBundle<T>(inputFormatClass);
+  }
+  
+  public InputBundle(Class<K> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+    this.extraConf = Maps.newHashMap();
+  }
+
+  public InputBundle<K> set(String key, String value) {
+    this.extraConf.put(key, value);
+    return this;
+  }
+
+  public Class<K> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public Map<String, String> getExtraConfiguration() {
+    return extraConf;
+  }
+
+  public Configuration configure(Configuration conf) {
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+    return conf;
+  }
+
+  public String serialize() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(this);
+      oos.close();
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getName() {
+    return inputFormatClass.getSimpleName();
+  }
+
+  @Override
+  public int hashCode() {
+    return new 
HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof InputBundle)) {
+      return false;
+    }
+    InputBundle<K> oib = (InputBundle<K>) other;
+    return inputFormatClass.equals(oib.inputFormatClass) && 
extraConf.equals(oib.extraConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java 
b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 32b8054..0e9a6ee 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 
 import org.apache.avro.mapred.AvroJob;
 import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.InputBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.io.impl.InputBundle;
 import org.apache.crunch.types.avro.AvroInputFormat;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java 
b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index d3e9c6f..4038b60 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.run.CrunchInputs;
+import org.apache.crunch.io.InputBundle;
 import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
@@ -38,15 +39,15 @@ public abstract class FileSourceImpl<T> implements 
Source<T> {
 
   protected final Path path;
   protected final PType<T> ptype;
-  protected final InputBundle inputBundle;
+  protected final InputBundle<?> inputBundle;
 
   public FileSourceImpl(Path path, PType<T> ptype, Class<? extends 
InputFormat> inputFormatClass) {
     this.path = path;
     this.ptype = ptype;
-    this.inputBundle = new InputBundle(inputFormatClass);
+    this.inputBundle = InputBundle.of(inputFormatClass);
   }
 
-  public FileSourceImpl(Path path, PType<T> ptype, InputBundle inputBundle) {
+  public FileSourceImpl(Path path, PType<T> ptype, InputBundle<?> inputBundle) 
{
     this.path = path;
     this.ptype = ptype;
     this.inputBundle = inputBundle;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java 
b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
index c7ea767..7d63cc0 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.impl;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
+import org.apache.crunch.io.InputBundle;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java 
b/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
deleted file mode 100644
index f92e70a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import com.google.common.collect.Maps;
-
-/**
- * A combination of an InputFormat and any configuration information that
- * InputFormat needs to run properly. InputBundles allow us to let different
- * InputFormats pretend as if they are the only InputFormat that exists in a
- * particular MapReduce job.
- */
-public class InputBundle implements Serializable {
-
-  private Class<? extends InputFormat> inputFormatClass;
-  private Map<String, String> extraConf;
-
-  public static InputBundle fromSerialized(String serialized) {
-    ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.decodeBase64(serialized));
-    try {
-      ObjectInputStream ois = new ObjectInputStream(bais);
-      InputBundle bundle = (InputBundle) ois.readObject();
-      ois.close();
-      return bundle;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public InputBundle(Class<? extends InputFormat> inputFormatClass) {
-    this.inputFormatClass = inputFormatClass;
-    this.extraConf = Maps.newHashMap();
-  }
-
-  public InputBundle set(String key, String value) {
-    this.extraConf.put(key, value);
-    return this;
-  }
-
-  public Class<? extends InputFormat> getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  public Map<String, String> getExtraConfiguration() {
-    return extraConf;
-  }
-
-  public Configuration configure(Configuration conf) {
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      conf.set(e.getKey(), e.getValue());
-    }
-    return conf;
-  }
-
-  public String serialize() {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(this);
-      oos.close();
-      return Base64.encodeBase64String(baos.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public String getName() {
-    return inputFormatClass.getSimpleName();
-  }
-
-  @Override
-  public int hashCode() {
-    return new 
HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof InputBundle)) {
-      return false;
-    }
-    InputBundle oib = (InputBundle) other;
-    return inputFormatClass.equals(oib.inputFormatClass) && 
extraConf.equals(oib.extraConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java 
b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
index d88ef4a..ad3414a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -20,9 +20,9 @@ package org.apache.crunch.io.text;
 import java.io.IOException;
 
 import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.InputBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.io.impl.InputBundle;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/092ef5d0/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java 
b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
index c94676a..23cda77 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.InputBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
-import org.apache.crunch.io.impl.InputBundle;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;

Reply via email to