Repository: crunch Updated Branches: refs/heads/master ce79a7a1b -> 077343a5d
CRUNCH-387: Added support for multiple scans on a single HBase table. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/077343a5 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/077343a5 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/077343a5 Branch: refs/heads/master Commit: 077343a5d7f777c8a6e26aee9ae64833f4a0ae47 Parents: ce79a7a Author: Micah Whitacre <[email protected]> Authored: Wed May 28 10:57:31 2014 -0500 Committer: Micah Whitacre <[email protected]> Committed: Wed May 28 10:57:31 2014 -0500 ---------------------------------------------------------------------- .../crunch/io/hbase/WordCountHBaseIT.java | 9 ++++- .../org/apache/crunch/io/hbase/HBaseData.java | 17 +++++--- .../crunch/io/hbase/HBaseSourceTarget.java | 42 +++++++++++++------- .../apache/crunch/io/hbase/HTableIterable.java | 13 +++--- .../apache/crunch/io/hbase/HTableIterator.java | 34 ++++++++++++---- 5 files changed, 79 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/077343a5/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index 13de752..de7b287 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -153,9 +153,16 @@ public class WordCountHBaseIT { key = put(inputTable, key, "dog"); inputTable.flushCommits(); + //Setup scan using multiple scans that simply cut the rows in half. Scan scan = new Scan(); scan.addFamily(WORD_COLFAM); - HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); + byte[] cutoffPoint = Bytes.toBytes(2); + scan.setStopRow(cutoffPoint); + Scan scan2 = new Scan(); + scan.addFamily(WORD_COLFAM); + scan2.setStartRow(cutoffPoint); + + HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2); PTable<ImmutableBytesWritable, Result> words = pipeline.read(source); Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap(); http://git-wip-us.apache.org/repos/asf/crunch/blob/077343a5/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java index 84c39db..84de288 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Set; @@ -35,12 +36,12 @@ import java.util.Set; public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> { private final String table; - private final String scanAsString; + private final String scansAsString; private transient SourceTarget parent; - public HBaseData(String table, String scanAsString, SourceTarget<?> parent) { + public HBaseData(String table, String scansAsString, SourceTarget<?> parent) { this.table = table; - this.scanAsString = scanAsString; + this.scansAsString = scansAsString; this.parent = parent; } @@ -63,7 +64,13 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException { Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration()); HTable htable = new HTable(hconf, table); - Scan scan = HBaseSourceTarget.convertStringToScan(scanAsString); - return new HTableIterable(htable, scan); + + String[] scanStrings = StringUtils.getStrings(scansAsString); + Scan[] scans = new Scan[scanStrings.length]; + for(int i = 0; i < scanStrings.length; i++){ + scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]); + } + + return new HTableIterable(htable, scans); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/077343a5/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index 99f7163..8601eb0 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; @@ -50,7 +51,9 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; public class HBaseSourceTarget extends HBaseTarget implements ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>, @@ -61,16 +64,31 @@ public class HBaseSourceTarget extends HBaseTarget implements private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf( Writables.writables(ImmutableBytesWritable.class), HBaseTypes.results()); - protected Scan scan; - private FormatBundle<TableInputFormat> inputBundle; + protected Scan[] scans; + protected String scansAsString; + private FormatBundle<MultiTableInputFormat> inputBundle; - public HBaseSourceTarget(String table, Scan scan) { + public HBaseSourceTarget(String table, Scan... scans) { super(table); - this.scan = scan; + this.scans = scans; + try { - this.inputBundle = FormatBundle.forInput(TableInputFormat.class) - .set(TableInputFormat.INPUT_TABLE, table) - .set(TableInputFormat.SCAN, convertScanToString(scan)); + + byte[] tableName = Bytes.toBytes(table); + //Copy scans and enforce that they are for the table specified + Scan[] tableScans = new Scan[scans.length]; + String[] scanStrings = new String[scans.length]; + for(int i = 0; i < scans.length; i++){ + tableScans[i] = new Scan(scans[i]); + //enforce Scan is for same table + tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName); + //Convert the Scan into a String + scanStrings[i] = convertScanToString(tableScans[i]); + } + this.scans = tableScans; + this.scansAsString = StringUtils.arrayToString(scanStrings); + this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class) + .set(MultiTableInputFormat.SCANS, scansAsString); } catch (IOException e) { throw new RuntimeException(e); } @@ -104,7 +122,7 @@ public class HBaseSourceTarget extends HBaseTarget implements @Override public int hashCode() { - return new HashCodeBuilder().append(table).append(scan).toHashCode(); + return new HashCodeBuilder().append(table).append(scansAsString).toHashCode(); } @Override @@ -161,16 +179,12 @@ public class HBaseSourceTarget extends HBaseTarget implements public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException { Configuration hconf = HBaseConfiguration.create(conf); HTable htable = new HTable(hconf, table); - return new HTableIterable(htable, scan); + return new HTableIterable(htable, scans); } @Override public ReadableData<Pair<ImmutableBytesWritable, Result>> asReadable() { - try { - return new HBaseData(table, convertScanToString(scan), this); - } catch (IOException e) { - throw new RuntimeException(e); - } + return new HBaseData(table, scansAsString, this); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/077343a5/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java index c58732c..a3dfc7d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java @@ -26,23 +26,20 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> { private final HTable table; - private final Scan scan; + private final Scan[] scans; - public HTableIterable(HTable table, Scan scan) { + public HTableIterable(HTable table, Scan... scans) { this.table = table; - this.scan = scan; + this.scans = scans; } @Override public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() { - try { - return new HTableIterator(table, table.getScanner(scan)); - } catch (IOException e) { - throw new RuntimeException(e); - } + return new HTableIterator(table, Arrays.asList(scans)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/077343a5/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java index bfe1439..22057bb 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java @@ -25,21 +25,29 @@ import org.apache.crunch.Pair; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import java.io.IOException; import java.util.Iterator; +import java.util.List; class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { private static final Log LOG = LogFactory.getLog(HTableIterator.class); private final HTable table; - private final ResultScanner scanner; - private final Iterator<Result> iter; + private final Iterator<Scan> scans; + private ResultScanner scanner; + private Iterator<Result> iter; - public HTableIterator(HTable table, ResultScanner scanner) { + public HTableIterator(HTable table, List<Scan> scans) { this.table = table; - this.scanner = scanner; + this.scans = scans.iterator(); + try{ + this.scanner = table.getScanner(this.scans.next()); + }catch(IOException ioe){ + throw new RuntimeException(ioe); + } this.iter = scanner.iterator(); } @@ -48,10 +56,20 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { boolean hasNext = iter.hasNext(); if (!hasNext) { scanner.close(); - try { - table.close(); - } catch (IOException e) { - LOG.error("Exception closing HTable: " + table.getName(), e); + hasNext = scans.hasNext(); + if(hasNext){ + try{ + scanner = table.getScanner(this.scans.next()); + iter = scanner.iterator(); + } catch(IOException ioe){ + throw new RuntimeException("Unable to create next scanner from "+ table.getName(), ioe); + } + } else { + try { + table.close(); + } catch (IOException e) { + LOG.error("Exception closing HTable: " + table.getName(), e); + } } } return hasNext;
