Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 dbec907af -> 8fe96d6cc
CRUNCH-387: Added support for the 0.8 code stream to support multiple HBase Scans Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8fe96d6c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8fe96d6c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8fe96d6c Branch: refs/heads/apache-crunch-0.8 Commit: 8fe96d6ccb7bb53a5fa0c3f011f3e6e6e8133838 Parents: dbec907 Author: Micah Whitacre <[email protected]> Authored: Wed May 28 10:57:31 2014 -0500 Committer: Micah Whitacre <[email protected]> Committed: Wed May 28 12:09:06 2014 -0500 ---------------------------------------------------------------------- .../crunch/io/hbase/WordCountHBaseIT.java | 13 +++++- .../org/apache/crunch/io/hbase/HBaseData.java | 17 +++++--- .../crunch/io/hbase/HBaseSourceTarget.java | 43 +++++++++++++------- .../apache/crunch/io/hbase/HTableIterable.java | 13 +++--- .../apache/crunch/io/hbase/HTableIterator.java | 35 ++++++++++++---- pom.xml | 2 +- 6 files changed, 85 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index af32c1a..2bbb70b 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -69,6 +69,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteStreams; +import javax.ws.rs.HEAD; + public class WordCountHBaseIT { static class StringifyFn extends MapFn<Pair<ImmutableBytesWritable, Pair<Result, Result>>, String> { @@ -237,9 +239,18 @@ public class WordCountHBaseIT { key = put(inputTable, key, "cat"); key = put(inputTable, key, "cat"); key = put(inputTable, key, "dog"); + inputTable.flushCommits(); + + //Setup scan using multiple scans that simply cut the rows in half. Scan scan = new Scan(); scan.addFamily(WORD_COLFAM); - HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); + byte[] cutoffPoint = Bytes.toBytes(2); + scan.setStopRow(cutoffPoint); + Scan scan2 = new Scan(); + scan.addFamily(WORD_COLFAM); + scan2.setStartRow(cutoffPoint); + + HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2); PTable<ImmutableBytesWritable, Result> words = pipeline.read(source); Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap(); http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java index 84c39db..84de288 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Set; @@ -35,12 +36,12 @@ import java.util.Set; public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> { private final String table; - private final String scanAsString; + private final String scansAsString; private transient SourceTarget parent; - public HBaseData(String table, String scanAsString, SourceTarget<?> parent) { + public HBaseData(String table, String scansAsString, SourceTarget<?> parent) { this.table = table; - this.scanAsString = scanAsString; + this.scansAsString = scansAsString; this.parent = parent; } @@ -63,7 +64,13 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException { Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration()); HTable htable = new HTable(hconf, table); - Scan scan = HBaseSourceTarget.convertStringToScan(scanAsString); - return new HTableIterable(htable, scan); + + String[] scanStrings = StringUtils.getStrings(scansAsString); + Scan[] scans = new Scan[scanStrings.length]; + for(int i = 0; i < scanStrings.length; i++){ + scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]); + } + + return new HTableIterable(htable, scans); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index c1d7eb7..6ed3b42 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; public class HBaseSourceTarget extends HBaseTarget implements ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>, @@ -60,16 +62,31 @@ public class HBaseSourceTarget extends HBaseTarget implements private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf( Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class)); - protected Scan scan; - private FormatBundle<TableInputFormat> inputBundle; + protected Scan[] scans; + protected String scansAsString; + private FormatBundle<MultiTableInputFormat> inputBundle; - public HBaseSourceTarget(String table, Scan scan) { + public HBaseSourceTarget(String table, Scan... scans) { super(table); - this.scan = scan; + this.scans = scans; + try { - this.inputBundle = FormatBundle.forInput(TableInputFormat.class) - .set(TableInputFormat.INPUT_TABLE, table) - .set(TableInputFormat.SCAN, convertScanToString(scan)); + + byte[] tableName = Bytes.toBytes(table); + //Copy scans and enforce that they are for the table specified + Scan[] tableScans = new Scan[scans.length]; + String[] scanStrings = new String[scans.length]; + for(int i = 0; i < scans.length; i++){ + tableScans[i] = new Scan(scans[i]); + //enforce Scan is for same table + tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName); + //Convert the Scan into a String + scanStrings[i] = convertScanToString(tableScans[i]); + } + this.scans = tableScans; + this.scansAsString = StringUtils.arrayToString(scanStrings); + this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class) + .set(MultiTableInputFormat.SCANS, scansAsString); } catch (IOException e) { throw new RuntimeException(e); } @@ -103,7 +120,7 @@ public class HBaseSourceTarget extends HBaseTarget implements @Override public int hashCode() { - return new HashCodeBuilder().append(table).append(scan).toHashCode(); + return new HashCodeBuilder().append(table).append(scansAsString).toHashCode(); } @Override @@ -161,16 +178,12 @@ public class HBaseSourceTarget extends HBaseTarget implements public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException { Configuration hconf = HBaseConfiguration.create(conf); HTable htable = new HTable(hconf, table); - return new HTableIterable(htable, scan); + return new HTableIterable(htable, scans); } @Override public ReadableData<Pair<ImmutableBytesWritable, Result>> asReadable() { - try { - return new HBaseData(table, convertScanToString(scan), this); - } catch (IOException e) { - throw new RuntimeException(e); - } + return new HBaseData(table, scansAsString, this); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java index c58732c..a3dfc7d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java @@ -26,23 +26,20 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> { private final HTable table; - private final Scan scan; + private final Scan[] scans; - public HTableIterable(HTable table, Scan scan) { + public HTableIterable(HTable table, Scan... scans) { this.table = table; - this.scan = scan; + this.scans = scans; } @Override public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() { - try { - return new HTableIterator(table, table.getScanner(scan)); - } catch (IOException e) { - throw new RuntimeException(e); - } + return new HTableIterator(table, Arrays.asList(scans)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java index daa4a48..d679b72 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java @@ -25,21 +25,30 @@ import org.apache.crunch.Pair; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.Iterator; +import java.util.List; class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { private static final Log LOG = LogFactory.getLog(HTableIterator.class); private final HTable table; - private final ResultScanner scanner; - private final Iterator<Result> iter; + private final Iterator<Scan> scans; + private ResultScanner scanner; + private Iterator<Result> iter; - public HTableIterator(HTable table, ResultScanner scanner) { + public HTableIterator(HTable table, List<Scan> scans) { this.table = table; - this.scanner = scanner; + this.scans = scans.iterator(); + try{ + this.scanner = table.getScanner(this.scans.next()); + }catch(IOException ioe){ + throw new RuntimeException(ioe); + } this.iter = scanner.iterator(); } @@ -48,10 +57,20 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { boolean hasNext = iter.hasNext(); if (!hasNext) { scanner.close(); - try { - table.close(); - } catch (IOException e) { - LOG.error("Exception closing HTable: " + table.getTableName(), e); + hasNext = scans.hasNext(); + if(hasNext){ + try{ + scanner = table.getScanner(this.scans.next()); + iter = scanner.iterator(); + } catch(IOException ioe){ + throw new RuntimeException("Unable to create next scanner from "+ Bytes.toString(table.getTableName()), ioe); + } + } else { + try { + table.close(); + } catch (IOException e) { + LOG.error("Exception closing HTable: " + Bytes.toString(table.getTableName()), e); + } } } return hasNext; http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4f82457..224d5a6 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,7 @@ under the License. <mockito.version>1.9.0</mockito.version> <pkg>org.apache.crunch</pkg> <hadoop.version>1.1.2</hadoop.version> - <hbase.version>0.94.3</hbase.version> + <hbase.version>0.94.15</hbase.version> <scala.base.version>2.10</scala.base.version> <scala.version>2.10.4</scala.version>
