Repository: crunch Updated Branches: refs/heads/master 3a1d474b0 -> 8bae517ea
CRUNCH-550: Removed deprecations in crunch-hbase also added support for TableName. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8bae517e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8bae517e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8bae517e Branch: refs/heads/master Commit: 8bae517ea1e1cb919ecc067eb6b877a4f58916f5 Parents: 3a1d474 Author: Micah Whitacre <[email protected]> Authored: Mon Jul 27 20:57:07 2015 -0500 Committer: Micah Whitacre <[email protected]> Committed: Tue Jul 28 22:22:53 2015 -0500 ---------------------------------------------------------------------- .../crunch/io/hbase/WordCountHBaseIT.java | 3 +- .../org/apache/crunch/io/hbase/FromHBase.java | 9 ++++++ .../org/apache/crunch/io/hbase/HBaseData.java | 18 ++++++++++-- .../crunch/io/hbase/HBaseSourceTarget.java | 30 +++++++++++++++----- .../org/apache/crunch/io/hbase/HBaseTarget.java | 18 +++++++++++- .../crunch/io/hbase/HFileInputFormat.java | 5 ++-- .../org/apache/crunch/io/hbase/HFileSource.java | 2 +- .../apache/crunch/io/hbase/HTableIterable.java | 10 +++++-- .../apache/crunch/io/hbase/HTableIterator.java | 14 +++++++-- .../org/apache/crunch/io/hbase/ToHBase.java | 5 ++++ 10 files changed, 93 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index 28ead90..4a06c0f 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -42,6 +42,7 @@ import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -178,7 +179,7 @@ public class WordCountHBaseIT { HBaseSourceTarget source = null; if(clazz == null){ - source = new HBaseSourceTarget(inputTableName, scan, scan2); + source = new HBaseSourceTarget(TableName.valueOf(inputTableName), scan, scan2); }else{ source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2}); } http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java index 18d5a95..16f6694 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java @@ -21,6 +21,7 @@ import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -35,6 +36,14 @@ public class FromHBase { } public static TableSource<ImmutableBytesWritable, Result> table(String table, Scan scan) { + return table(TableName.valueOf(table), scan); + } + + public static TableSource<ImmutableBytesWritable, Result> table(TableName table) { + return table(table, new Scan()); + } + + public static TableSource<ImmutableBytesWritable, Result> table(TableName table, Scan scan) { return new HBaseSourceTarget(table, scan); } http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java index 4a721f3..4ac6c8e 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java @@ -23,9 +23,13 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.StringUtils; @@ -36,11 +40,13 @@ import java.util.Set; public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> { private final String table; + private transient TableName tableName; private final String scansAsString; private transient SourceTarget parent; public HBaseData(String table, String scansAsString, SourceTarget<?> parent) { this.table = table; + this.tableName = TableName.valueOf(table); this.scansAsString = scansAsString; this.parent = parent; } @@ -63,7 +69,8 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu public Iterable<Pair<ImmutableBytesWritable, Result>> read( TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException { Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration()); - HTable htable = new HTable(hconf, table); + Connection connection = ConnectionFactory.createConnection(hconf); + Table htable = connection.getTable(getTableName()); String[] scanStrings = StringUtils.getStrings(scansAsString); int length = scanStrings == null ? 0 : scanStrings.length; @@ -72,6 +79,13 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]); } - return new HTableIterable(htable, scans); + return new HTableIterable(connection, htable, scans); + } + + private TableName getTableName(){ + if(tableName == null){ + tableName = TableName.valueOf(table); + } + return tableName; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index c98436d..ede7603 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -36,9 +36,12 @@ import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; @@ -66,6 +69,7 @@ public class HBaseSourceTarget extends HBaseTarget implements protected Scan[] scans; protected String scansAsString; + private FormatBundle<? extends MultiTableInputFormatBase> inputBundle; public HBaseSourceTarget(String table, Scan scan) { @@ -75,25 +79,37 @@ public class HBaseSourceTarget extends HBaseTarget implements public HBaseSourceTarget(String table, Scan scan, Scan... additionalScans) { this(table, ObjectArrays.concat(scan, additionalScans)); } + + public HBaseSourceTarget(TableName table, Scan scan, Scan... additionalScans) { + this(table, ObjectArrays.concat(scan, additionalScans)); + } public HBaseSourceTarget(String table, Scan[] scans) { this(table, MultiTableInputFormat.class, scans); } + public HBaseSourceTarget(TableName table, Scan[] scans) { + this(table, MultiTableInputFormat.class, scans); + } + public HBaseSourceTarget(String table, Class<? extends MultiTableInputFormatBase> clazz, Scan[] scans) { - super(table); + this(TableName.valueOf(table), clazz, scans); + } + + public HBaseSourceTarget(TableName tableName, Class<? extends MultiTableInputFormatBase> clazz, Scan[] scans) { + super(tableName); this.scans = scans; try { - byte[] tableName = Bytes.toBytes(table); + byte[] tableNameAsBytes = Bytes.toBytes(table); //Copy scans and enforce that they are for the table specified Scan[] tableScans = new Scan[scans.length]; String[] scanStrings = new String[scans.length]; for(int i = 0; i < scans.length; i++){ tableScans[i] = new Scan(scans[i]); //enforce Scan is for same table - tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName); + tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableNameAsBytes); //Convert the Scan into a String scanStrings[i] = convertScanToString(tableScans[i]); } @@ -190,8 +206,9 @@ public class HBaseSourceTarget extends HBaseTarget implements @Override public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException { Configuration hconf = HBaseConfiguration.create(conf); - HTable htable = new HTable(hconf, table); - return new HTableIterable(htable, scans); + Connection connection = ConnectionFactory.createConnection(hconf); + Table htable = connection.getTable(getTableName()); + return new HTableIterable(connection, htable, scans); } @Override @@ -205,5 +222,4 @@ public class HBaseSourceTarget extends HBaseTarget implements outputConf(key, value); return this; } - } http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index 7c67577..f287d5e 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -34,6 +34,7 @@ import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -50,10 +51,18 @@ public class HBaseTarget implements MapReduceTarget { private static final Logger LOG = LoggerFactory.getLogger(HBaseTarget.class); protected String table; + + private transient TableName tableName; + private Map<String, String> extraConf = Maps.newHashMap(); public HBaseTarget(String table) { - this.table = table; + this(TableName.valueOf(table)); + } + + public HBaseTarget(TableName tableName){ + this.tableName = tableName; + this.table = tableName.getNameAsString(); } @Override @@ -153,4 +162,11 @@ public class HBaseTarget implements MapReduceTarget { ptype.getTypeClass()); } } + + protected TableName getTableName(){ + if(tableName == null){ + tableName = TableName.valueOf(table); + } + return tableName; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java index 7381ddd..26821bf 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -185,7 +184,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { // to depend on it. private static boolean seekAtOrAfter(HFileScanner s, KeyValue k) throws IOException { - int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength()); + int result = s.seekTo(k); if(result < 0) { // Passed KV is smaller than first KV in file, work from start of file return s.seekTo(); @@ -206,7 +205,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { // Explode out directories that match the original FileInputFormat filters since HFiles are written to directories where the // directory name is the column name for (FileStatus status : super.listStatus(job)) { - if (status.isDir()) { + if (status.isDirectory()) { FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { result.add(match); http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java index bd3cc8f..2240b9c 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java @@ -137,7 +137,7 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSou } long sum = 0; for (FileStatus status : statuses) { - if (status.isDir()) { + if (status.isDirectory()) { sum += SourceTargetHelper.getPathSize(fs, status.getPath()); } else { sum += status.getLen(); http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java index a3dfc7d..c772515 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java @@ -20,9 +20,11 @@ package org.apache.crunch.io.hbase; import org.apache.crunch.Pair; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import java.io.IOException; @@ -30,16 +32,18 @@ import java.util.Arrays; import java.util.Iterator; class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> { - private final HTable table; + private final Table table; private final Scan[] scans; + private final Connection connection; - public HTableIterable(HTable table, Scan... scans) { + public HTableIterable(Connection connection, Table table, Scan... scans) { this.table = table; + this.connection = connection; this.scans = scans; } @Override public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() { - return new HTableIterator(table, Arrays.asList(scans)); + return new HTableIterator(connection, table, Arrays.asList(scans)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java index 3db5897..ebef5d3 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java @@ -20,10 +20,11 @@ package org.apache.crunch.io.hbase; import org.apache.crunch.Pair; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,13 +36,15 @@ import java.util.List; class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { private static final Logger LOG = LoggerFactory.getLogger(HTableIterator.class); - private final HTable table; + private final Table table; + private final Connection connection; private final Iterator<Scan> scans; private ResultScanner scanner; private Iterator<Result> iter; - public HTableIterator(HTable table, List<Scan> scans) { + public HTableIterator(Connection connection, Table table, List<Scan> scans) { this.table = table; + this.connection = connection; this.scans = scans.iterator(); try{ this.scanner = table.getScanner(this.scans.next()); @@ -70,6 +73,11 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { } catch (IOException e) { LOG.error("Exception closing HTable: {}", table.getName(), e); } + try { + connection.close(); + } catch (IOException e) { + LOG.error("Exception closing HTable: {}", table.getName(), e); + } } } return hasNext; http://git-wip-us.apache.org/repos/asf/crunch/blob/8bae517e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java index 2c53ae1..78267cf 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/ToHBase.java @@ -19,6 +19,7 @@ package org.apache.crunch.io.hbase; import org.apache.crunch.Target; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; /** * Static factory methods for creating HBase {@link Target} types. @@ -26,6 +27,10 @@ import org.apache.hadoop.fs.Path; public class ToHBase { public static Target table(String table) { + return table(TableName.valueOf(table)); + } + + public static Target table(TableName table) { return new HBaseTarget(table); }
