Repository: crunch Updated Branches: refs/heads/master d176778cf -> b06c5cc27
CRUNCH-533: Extend HBaseSourceTarget to support custom MultiTableInputFormat implementations. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b06c5cc2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b06c5cc2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b06c5cc2 Branch: refs/heads/master Commit: b06c5cc27b97d92d5c0b689cf58af90166155717 Parents: d176778 Author: Micah Whitacre <[email protected]> Authored: Mon Jun 29 20:53:45 2015 -0500 Committer: Micah Whitacre <[email protected]> Committed: Mon Jun 29 20:53:45 2015 -0500 ---------------------------------------------------------------------- .../crunch/io/hbase/WordCountHBaseIT.java | 30 +++++++++++++++++++- .../crunch/io/hbase/HBaseSourceTarget.java | 9 ++++-- 2 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b06c5cc2/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index dd48352..28ead90 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; @@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; @@ -129,6 +132,12 @@ public class WordCountHBaseIT { run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration())); } + @Test + public void testWordCountCustomFormat() throws Exception { + run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()), MyTableInputFormat.class); + assertTrue(MyTableInputFormat.CONSTRUCTED.get()); + } + @After public void tearDown() throws Exception { hbaseTestUtil.shutdownMiniHBaseCluster(); @@ -136,6 +145,10 @@ public class WordCountHBaseIT { } public void run(Pipeline pipeline) throws Exception { + run(pipeline, null); + } + + public void run(Pipeline pipeline, Class<? extends MultiTableInputFormatBase> clazz) throws Exception { Random rand = new Random(); int postFix = rand.nextInt() & 0x7FFFFFFF; @@ -163,7 +176,13 @@ public class WordCountHBaseIT { scan.addFamily(WORD_COLFAM); scan2.setStartRow(cutoffPoint); - HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2); + HBaseSourceTarget source = null; + if(clazz == null){ + source = new HBaseSourceTarget(inputTableName, scan, scan2); + }else{ + source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2}); + } + PTable<ImmutableBytesWritable, Result> words = pipeline.read(source); Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap(); @@ -237,4 +256,13 @@ public class WordCountHBaseIT { assertTrue(result.isEmpty()); } + public static class MyTableInputFormat extends MultiTableInputFormat{ + + public static final AtomicBoolean CONSTRUCTED = new AtomicBoolean(); + + public MyTableInputFormat(){ + CONSTRUCTED.set(true); + } + } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/b06c5cc2/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index a957898..c98436d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -65,7 +66,7 @@ public class HBaseSourceTarget extends HBaseTarget implements protected Scan[] scans; protected String scansAsString; - private FormatBundle<MultiTableInputFormat> inputBundle; + private FormatBundle<? extends MultiTableInputFormatBase> inputBundle; public HBaseSourceTarget(String table, Scan scan) { this(table, new Scan[] { scan }); @@ -76,6 +77,10 @@ public class HBaseSourceTarget extends HBaseTarget implements } public HBaseSourceTarget(String table, Scan[] scans) { + this(table, MultiTableInputFormat.class, scans); + } + + public HBaseSourceTarget(String table, Class<? extends MultiTableInputFormatBase> clazz, Scan[] scans) { super(table); this.scans = scans; @@ -94,7 +99,7 @@ public class HBaseSourceTarget extends HBaseTarget implements } this.scans = tableScans; this.scansAsString = StringUtils.arrayToString(scanStrings); - this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class) + this.inputBundle = FormatBundle.forInput(clazz) .set(MultiTableInputFormat.SCANS, scansAsString); } catch (IOException e) { throw new RuntimeException(e);
