Updated Branches: refs/heads/master 9a63b294c -> 35136cb4e
CRUNCH-261: Make HBase sources readable. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/35136cb4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/35136cb4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/35136cb4 Branch: refs/heads/master Commit: 35136cb4eaf45b6e4c69932d4a1dca04a68ce5c6 Parents: 9a63b29 Author: Josh Wills <[email protected]> Authored: Sat Sep 7 13:22:45 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Sep 9 10:55:50 2013 -0700 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/HFileSourceIT.java | 16 +++- .../crunch/io/hbase/WordCountHBaseIT.java | 5 ++ .../crunch/io/hbase/HBaseSourceTarget.java | 72 +++++++++++++++- .../crunch/io/hbase/HFileReaderFactory.java | 88 ++++++++++++++++++++ .../org/apache/crunch/io/hbase/HFileSource.java | 8 +- 5 files changed, 185 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java index 61e7663..9363aba 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java @@ -81,7 +81,6 @@ public class HFileSourceIT implements Serializable { conf = tmpDir.getDefaultConfiguration(); } - @Test public void testHFileSource() throws IOException { List<KeyValue> kvs = generateKeyValues(100); Path inputPath = tmpDir.getPath("in"); @@ -108,6 +107,12 @@ public class HFileSourceIT implements Serializable { } @Test + public void testReadHFile() throws Exception { + List<KeyValue> kvs = generateKeyValues(100); + assertEquals(kvs, doTestReadHFiles(kvs, new Scan())); + } + + @Test public void testScanHFiles() throws IOException { List<KeyValue> kvs = ImmutableList.of( new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), @@ -196,6 +201,15 @@ public class HFileSourceIT implements Serializable { return ImmutableList.copyOf(results.materialize()); } + private List<KeyValue> doTestReadHFiles(List<KeyValue> kvs, Scan scan) throws IOException { + Path inputPath = tmpDir.getPath("in"); + writeKeyValuesToHFile(inputPath, kvs); + + Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf); + PCollection<KeyValue> results = pipeline.read(FromHBase.hfile(inputPath)); + return ImmutableList.copyOf(results.materialize()); + } + private List<KeyValue> generateKeyValues(int count) { List<KeyValue> kvs = Lists.newArrayList(); for (int i = 0; i < count; i++) { http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index 6375ea1..149e359 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -26,6 +26,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.util.Map; import java.util.Random; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; @@ -243,6 +244,10 @@ public class WordCountHBaseIT { scan.addFamily(WORD_COLFAM); HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan); PTable<ImmutableBytesWritable, Result> words = pipeline.read(source); + + Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap(); + assertEquals(3, materialized.size()); + PCollection<Put> puts = wordCount(words); pipeline.write(puts, new HBaseTarget(outputTableName)); pipeline.write(puts, new HBaseTarget(otherTableName)); http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index 71e752a..c003e48 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -20,16 +20,17 @@ package org.apache.crunch.io.hbase; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Iterator; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.Pair; -import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; import org.apache.crunch.impl.mr.run.CrunchMapper; import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -37,7 +38,9 @@ import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; @@ -45,7 +48,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.mapreduce.Job; -public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>, +public class HBaseSourceTarget extends HBaseTarget implements + ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>, TableSource<ImmutableBytesWritable, Result> { private static final Log LOG = LogFactory.getLog(HBaseSourceTarget.class); @@ -134,4 +138,68 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair< public Converter<?, ?, ?, ?> getConverter() { return PTYPE.getConverter(); } + + @Override + public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException { + Configuration hconf = HBaseConfiguration.create(conf); + HTable htable = new HTable(hconf, table); + return new HTableIterable(htable, scan); + } + + private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> { + private final HTable table; + private final Scan scan; + + public HTableIterable(HTable table, Scan scan) { + this.table = table; + this.scan = scan; + } + + @Override + public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() { + try { + return new HTableIterator(table, table.getScanner(scan)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { + + private final HTable table; + private final ResultScanner scanner; + private final Iterator<Result> iter; + + public HTableIterator(HTable table, ResultScanner scanner) { + this.table = table; + this.scanner = scanner; + this.iter = scanner.iterator(); + } + + @Override + public boolean hasNext() { + boolean hasNext = iter.hasNext(); + if (!hasNext) { + scanner.close(); + try { + table.close(); + } catch (IOException e) { + LOG.error("Exception closing HTable: " + table.getTableName(), e); + } + } + return hasNext; + } + + @Override + public Pair<ImmutableBytesWritable, Result> next() { + Result next = iter.next(); + return Pair.of(new ImmutableBytesWritable(next.getRow()), next); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java new file mode 100644 index 0000000..f5db516 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import org.apache.crunch.io.FileReaderFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; + +import java.io.IOException; +import java.util.Iterator; + +public class HFileReaderFactory implements FileReaderFactory<KeyValue> { + + public static final String HFILE_SCANNER_CACHE_BLOCKS = "crunch.hfile.scanner.cache.blocks"; + public static final String HFILE_SCANNER_PREAD = "crunch.hfile.scanner.pread"; + + @Override + public Iterator<KeyValue> read(FileSystem fs, Path path) { + Configuration conf = fs.getConf(); + CacheConfig cacheConfig = new CacheConfig(conf); + try { + HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig); + HFileScanner scanner = hfr.getScanner( + conf.getBoolean(HFILE_SCANNER_CACHE_BLOCKS, false), + conf.getBoolean(HFILE_SCANNER_PREAD, false)); + scanner.seekTo(); + return new HFileIterator(scanner); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static class HFileIterator implements Iterator<KeyValue> { + + private final HFileScanner scanner; + private KeyValue curr; + + public HFileIterator(HFileScanner scanner) { + this.scanner = scanner; + this.curr = scanner.getKeyValue(); + } + + @Override + public boolean hasNext() { + return curr != null; + } + + @Override + public KeyValue next() { + KeyValue ret = curr; + try { + if (scanner.next()) { + curr = scanner.getKeyValue(); + } else { + curr = null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("HFileIterator is read-only"); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java index 7fa7280..c1d15a2 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java @@ -66,7 +66,13 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSou @Override public Iterable<KeyValue> read(Configuration conf) throws IOException { - throw new UnsupportedOperationException("HFileSource#read(Configuration) is not implemented yet"); + conf = new Configuration(conf); + inputBundle.configure(conf); + if (conf.get(HFileInputFormat.START_ROW_KEY) != null || + conf.get(HFileInputFormat.STOP_ROW_KEY) != null) { + throw new IllegalStateException("Cannot filter row ranges in HFileSource.read"); + } + return read(conf, new HFileReaderFactory()); } @Override
