Updated Branches: refs/heads/master d91390d6c -> 71e678300
CRUNCH-246: HFileSource and related utilities (Thanks Ryan Brush for contributing HFileInputFormat) Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/71e67830 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/71e67830 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/71e67830 Branch: refs/heads/master Commit: 71e67830076e328c81ec84ba745f0b8466752909 Parents: d91390d Author: Chao Shi <[email protected]> Authored: Mon Sep 2 12:22:00 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Mon Sep 2 12:22:00 2013 +0800 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/HFileSourceIT.java | 229 ++++++++++++++ .../org/apache/crunch/io/hbase/FromHBase.java | 9 + .../crunch/io/hbase/HFileInputFormat.java | 220 ++++++++++++++ .../org/apache/crunch/io/hbase/HFileSource.java | 76 +++++ .../org/apache/crunch/io/hbase/HFileUtils.java | 302 ++++++++++++++++++- 5 files changed, 835 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java new file mode 100644 index 0000000..61e7663 --- /dev/null +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +import static org.apache.crunch.types.writable.Writables.strings; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class HFileSourceIT implements Serializable { + + private static byte[] ROW1 = Bytes.toBytes("row1"); + private static byte[] ROW2 = Bytes.toBytes("row2"); + private static byte[] ROW3 = Bytes.toBytes("row3"); + private static byte[] FAMILY1 = Bytes.toBytes("family1"); + private static byte[] FAMILY2 = Bytes.toBytes("family2"); + private static byte[] FAMILY3 = Bytes.toBytes("family3"); + private static byte[] QUALIFIER1 = Bytes.toBytes("qualifier1"); + private static byte[] QUALIFIER2 = Bytes.toBytes("qualifier2"); + private static byte[] QUALIFIER3 = Bytes.toBytes("qualifier3"); + private static byte[] QUALIFIER4 = Bytes.toBytes("qualifier4"); + private static byte[] VALUE1 = Bytes.toBytes("value1"); + private static byte[] VALUE2 = Bytes.toBytes("value2"); + private static byte[] VALUE3 = Bytes.toBytes("value3"); + private static byte[] VALUE4 = Bytes.toBytes("value4"); + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + private transient Configuration conf; + + @Before + public void setUp() { + conf = tmpDir.getDefaultConfiguration(); + } + + @Test + public void testHFileSource() throws IOException { + List<KeyValue> kvs = generateKeyValues(100); + Path inputPath = tmpDir.getPath("in"); + Path outputPath = tmpDir.getPath("out"); + writeKeyValuesToHFile(inputPath, kvs); + + Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf); + PCollection<KeyValue> in = pipeline.read(FromHBase.hfile(inputPath)); + PCollection<String> texts = in.parallelDo(new MapFn<KeyValue, String>() { + @Override + public String map(KeyValue input) { + return input.toString(); + } + }, strings()); + texts.write(To.textFile(outputPath)); + PipelineResult result = pipeline.run(); + assertTrue(result.succeeded()); + + List<String> lines = FileUtils.readLines(new File(outputPath.toString(), "part-m-00000")); + assertEquals(kvs.size(), lines.size()); + for (int i = 0; i < kvs.size(); i++) { + assertEquals(kvs.get(i).toString(), lines.get(i)); + } + } + + @Test + public void testScanHFiles() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW1, FAMILY1, QUALIFIER2, 0, VALUE2)); + List<Result> results = doTestScanHFiles(kvs, new Scan()); + assertEquals(1, results.size()); + Result result = Iterables.getOnlyElement(results); + assertArrayEquals(ROW1, result.getRow()); + assertEquals(2, result.raw().length); + assertArrayEquals(VALUE1, result.getColumnLatest(FAMILY1, QUALIFIER1).getValue()); + assertArrayEquals(VALUE2, result.getColumnLatest(FAMILY1, QUALIFIER2).getValue()); + } + + @Test + public void testScanHFiles_maxVersions() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 3, VALUE3), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2)); + Scan scan = new Scan(); + scan.setMaxVersions(2); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(1, results.size()); + Result result = Iterables.getOnlyElement(results); + List<KeyValue> kvs2 = result.getColumn(FAMILY1, QUALIFIER1); + assertEquals(3, kvs2.size()); + assertArrayEquals(VALUE3, kvs2.get(0).getValue()); + assertArrayEquals(VALUE2, kvs2.get(1).getValue()); + assertArrayEquals(VALUE1, kvs2.get(2).getValue()); + } + + @Test + public void testScanHFiles_startStopRows() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW3, FAMILY1, QUALIFIER1, 0, VALUE1)); + Scan scan = new Scan(); + scan.setStartRow(ROW2); + scan.setStopRow(ROW3); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(1, results.size()); + Result result = Iterables.getOnlyElement(results); + assertArrayEquals(ROW2, result.getRow()); + } + + @Test + public void testScanHFiles_familyMap() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW1, FAMILY2, QUALIFIER2, 0, VALUE2), + new KeyValue(ROW1, FAMILY2, QUALIFIER3, 0, VALUE3), + new KeyValue(ROW1, FAMILY3, QUALIFIER4, 0, VALUE4)); + Scan scan = new Scan(); + scan.addFamily(FAMILY1); + scan.addColumn(FAMILY2, QUALIFIER2); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(1, results.size()); + Result result = Iterables.getOnlyElement(results); + assertEquals(2, result.size()); + assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER1)); + assertNotNull(result.getColumnLatest(FAMILY2, QUALIFIER2)); + } + + @Test + public void testScanHFiles_timeRange() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1), + new KeyValue(ROW1, FAMILY1, QUALIFIER2, 2, VALUE2), + new KeyValue(ROW1, FAMILY1, QUALIFIER2, 3, VALUE3)); + Scan scan = new Scan(); + scan.setTimeRange(2, 3); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(1, results.size()); + Result result = Iterables.getOnlyElement(results); + assertEquals(1, result.size()); + assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2)); + } + + private List<Result> doTestScanHFiles(List<KeyValue> kvs, Scan scan) throws IOException { + Path inputPath = tmpDir.getPath("in"); + writeKeyValuesToHFile(inputPath, kvs); + + Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf); + PCollection<Result> results = HFileUtils.scanHFiles(pipeline, inputPath, scan); + return ImmutableList.copyOf(results.materialize()); + } + + private List<KeyValue> generateKeyValues(int count) { + List<KeyValue> kvs = Lists.newArrayList(); + for (int i = 0; i < count; i++) { + kvs.add(new KeyValue( + Bytes.toBytes("row_" + i), + Bytes.toBytes("family"), + Bytes.toBytes("qualifier_" + i))); + } + Collections.sort(kvs, KeyValue.COMPARATOR); + return kvs; + } + + private Path writeKeyValuesToHFile(Path inputPath, List<KeyValue> kvs) throws IOException { + HFile.Writer w = null; + try { + List<KeyValue> sortedKVs = Lists.newArrayList(kvs); + Collections.sort(sortedKVs, KeyValue.COMPARATOR); + FileSystem fs = FileSystem.get(conf); + w = HFile.getWriterFactory(conf, new CacheConfig(conf)) + .withPath(fs, inputPath) + .withComparator(KeyValue.KEY_COMPARATOR) + .create(); + for (KeyValue kv : sortedKVs) { + w.append(kv); + } + return inputPath; + } finally { + IOUtils.closeQuietly(w); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java index 221de9b..18d5a95 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/FromHBase.java @@ -19,6 +19,8 @@ package org.apache.crunch.io.hbase; import org.apache.crunch.Source; import org.apache.crunch.TableSource; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -36,4 +38,11 @@ public class FromHBase { return new HBaseSourceTarget(table, scan); } + public static Source<KeyValue> hfile(String path) { + return hfile(new Path(path)); + } + + public static Source<KeyValue> hfile(Path path) { + return new HFileSource(path); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java new file mode 100644 index 0000000..07b4b15 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileReaderV2; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Simple input format for HFiles. + */ +public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { + + private static final Log LOG = LogFactory.getLog(HFileInputFormat.class); + static final String START_ROW_KEY = "crunch.hbase.hfile.input.format.start.row"; + static final String STOP_ROW_KEY = "crunch.hbase.hfile.input.format.stop.row"; + + /** + * File filter that removes all "hidden" files. This might be something worth removing from + * a more general purpose utility; it accounts for the presence of metadata files created + * in the way we're doing exports. + */ + private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Record reader for HFiles. + */ + private static class HFileRecordReader extends RecordReader<NullWritable, KeyValue> { + + private Reader in; + protected Configuration conf; + private HFileScanner scanner; + + /** + * A private cache of the key value so it doesn't need to be loaded twice from the scanner. + */ + private KeyValue value = null; + private byte[] startRow = null; + private byte[] stopRow = null; + private boolean reachedStopRow = false; + private long count; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + conf = context.getConfiguration(); + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + + long fileSize = fs.getFileStatus(path).getLen(); + + // Open the underlying input stream; it will be closed by the HFileReader below. + FSDataInputStream iStream = fs.open(path); + FixedFileTrailer fileTrailer = FixedFileTrailer.readFromStream(iStream, fileSize); + + // If this class is generalized, it may need to account for different data block encodings. + this.in = new HFileReaderV2(path, fileTrailer, iStream, iStream, fileSize, true, new CacheConfig(conf), + DataBlockEncoding.NONE, new HFileSystem(fs)); + + // The file info must be loaded before the scanner can be used. + // This seems like a bug in HBase, but it's easily worked around. + this.in.loadFileInfo(); + this.scanner = in.getScanner(false, false); + + String startRowStr = conf.get(START_ROW_KEY); + if (startRowStr != null) { + this.startRow = decodeHexOrDie(startRowStr); + } + String stopRowStr = conf.get(STOP_ROW_KEY); + if (stopRowStr != null) { + this.stopRow = decodeHexOrDie(stopRowStr); + } + } + + private static byte[] decodeHexOrDie(String s) { + try { + return Hex.decodeHex(s.toCharArray()); + } catch (DecoderException e) { + throw new AssertionError("Failed to decode hex string: " + s); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reachedStopRow) { + return false; + } + boolean hasNext; + if (!scanner.isSeeked()) { + if (startRow != null) { + LOG.info("Seeking to start row " + Bytes.toStringBinary(startRow)); + KeyValue kv = KeyValue.createFirstOnRow(startRow); + hasNext = (scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()) >= 0); + } else { + LOG.info("Seeking to start"); + hasNext = scanner.seekTo(); + } + } else { + hasNext = scanner.next(); + } + if (!hasNext) { + return false; + } + value = scanner.getKeyValue(); + if (stopRow != null && Bytes.compareTo( + value.getBuffer(), value.getRowOffset(), value.getRowLength(), + stopRow, 0, stopRow.length) >= 0) { + LOG.info("Reached stop row " + Bytes.toStringBinary(stopRow)); + reachedStopRow = true; + value = null; + return false; + } + count++; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public KeyValue getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to + // the start row, but better than nothing anyway. + return 1.0f * count / in.getEntries(); + } + + @Override + public void close() throws IOException { + in.close(); + } + } + + @Override + protected List<FileStatus> listStatus(JobContext job) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + + FileSystem fs = FileSystem.get(job.getConfiguration()); + + // Explode out directories that match the original FileInputFormat filters since HFiles are written to directories where the + // directory name is the column name + for (FileStatus status : super.listStatus(job)) { + if (status.isDir()) { + for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + result.add(match); + } + } + else{ + result.add(status); + } + } + + return result; + } + + @Override + public RecordReader<NullWritable, KeyValue> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + // This file isn't splittable. + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java new file mode 100644 index 0000000..13137b8 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.hbase; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.codec.binary.Hex; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static org.apache.crunch.types.writable.Writables.writables; + +public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSource<KeyValue> { + + private static final PType<KeyValue> KEY_VALUE_PTYPE = writables(KeyValue.class); + + public HFileSource(Path path) { + this(ImmutableList.of(path)); + } + + public HFileSource(List<Path> paths) { + this(paths, new Scan()); + } + + // Package-local. Don't want it to be too open, because we only support limited filters yet + // (namely start/stop row). Users who need advanced filters should use HFileUtils#scanHFiles. + HFileSource(List<Path> paths, Scan scan) { + super(paths, KEY_VALUE_PTYPE, createInputFormatBundle(scan)); + } + + private static FormatBundle<HFileInputFormat> createInputFormatBundle(Scan scan) { + FormatBundle<HFileInputFormat> bundle = FormatBundle.forInput(HFileInputFormat.class); + if (!Objects.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { + bundle.set(HFileInputFormat.START_ROW_KEY, Hex.encodeHexString(scan.getStartRow())); + } + if (!Objects.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { + bundle.set(HFileInputFormat.STOP_ROW_KEY, Hex.encodeHexString(scan.getStopRow())); + } + return bundle; + } + + @Override + public Iterable<KeyValue> read(Configuration conf) throws IOException { + throw new UnsupportedOperationException("HFileSource#read(Configuration) is not implemented yet"); + } + + @Override + public String toString() { + return "HFile(" + pathsAsString() + ")"; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/71e67830/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index df98325..e2f1520 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -18,6 +18,7 @@ package org.apache.crunch.io.hbase; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,22 +30,35 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.sort.TotalOrderPartitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import static org.apache.crunch.types.writable.Writables.bytes; import static org.apache.crunch.types.writable.Writables.nulls; import static org.apache.crunch.types.writable.Writables.tableOf; import static org.apache.crunch.types.writable.Writables.writables; @@ -69,6 +83,118 @@ public final class HFileUtils { } } + private static class StartRowFilterFn extends FilterFn<KeyValue> { + + private final byte[] startRow; + + private StartRowFilterFn(byte[] startRow) { + this.startRow = startRow; + } + + @Override + public boolean accept(KeyValue input) { + return Bytes.compareTo(input.getRow(), startRow) >= 0; + } + } + + private static class StopRowFilterFn extends FilterFn<KeyValue> { + + private final byte[] stopRow; + + private StopRowFilterFn(byte[] stopRow) { + this.stopRow = stopRow; + } + + @Override + public boolean accept(KeyValue input) { + return Bytes.compareTo(input.getRow(), stopRow) < 0; + } + } + + private static class FamilyMapFilterFn extends FilterFn<KeyValue> { + + private static class Column implements Serializable { + + private final byte[] family; + private final byte[] qualifier; + + private Column(byte[] family, byte[] qualifier) { + this.family = family; + this.qualifier = qualifier; + } + + private byte[] getFamily() { + return family; + } + + private byte[] getQualifier() { + return qualifier; + } + } + + private final List<byte[]> families = Lists.newArrayList(); + private final List<Column> qualifiers = Lists.newArrayList(); + + private transient Set<ByteBuffer> familySet; + private transient Set<Pair<ByteBuffer, ByteBuffer>> qualifierSet; + + private FamilyMapFilterFn(Map<byte[], NavigableSet<byte[]>> familyMap) { + // Holds good families and qualifiers in Lists, as ByteBuffer is not Serializable. + for (Map.Entry<byte[], NavigableSet<byte[]>> e : familyMap.entrySet()) { + byte[] f = e.getKey(); + if (e.getValue() == null) { + families.add(f); + } else { + for (byte[] q : e.getValue()) { + qualifiers.add(new Column(f, q)); + } + } + } + } + + @Override + public void initialize() { + ImmutableSet.Builder<ByteBuffer> familiySetBuilder = ImmutableSet.builder(); + ImmutableSet.Builder<Pair<ByteBuffer, ByteBuffer>> qualifierSetBuilder + = ImmutableSet.builder(); + for (byte[] f : families) { + familiySetBuilder.add(ByteBuffer.wrap(f)); + } + for (Column e : qualifiers) { + byte[] f = e.getFamily(); + byte[] q = e.getQualifier(); + qualifierSetBuilder.add(Pair.of(ByteBuffer.wrap(f), ByteBuffer.wrap(q))); + } + this.familySet = familiySetBuilder.build(); + this.qualifierSet = qualifierSetBuilder.build(); + } + + @Override + public boolean accept(KeyValue input) { + byte[] b = input.getBuffer(); + ByteBuffer f = ByteBuffer.wrap(b, input.getFamilyOffset(), input.getFamilyLength()); + ByteBuffer q = ByteBuffer.wrap(b, input.getQualifierOffset(), input.getQualifierLength()); + return familySet.contains(f) || qualifierSet.contains(Pair.of(f, q)); + } + } + + private static class TimeRangeFilterFn extends FilterFn<KeyValue> { + + private final long minTimestamp; + private final long maxTimestamp; + + private TimeRangeFilterFn(TimeRange timeRange) { + // Can't save TimeRange to member directly, as it is not Serializable. + this.minTimestamp = timeRange.getMin(); + this.maxTimestamp = timeRange.getMax(); + } + + @Override + public boolean accept(KeyValue input) { + return (minTimestamp <= input.getTimestamp() && input.getTimestamp() < maxTimestamp); + } + } + private static class KeyValueComparator implements RawComparator<KeyValue> { @Override @@ -92,7 +218,84 @@ public final class HFileUtils { } } - private HFileUtils() {} + private static final MapFn<KeyValue,ByteBuffer> EXTRACT_ROW_FN = new MapFn<KeyValue, ByteBuffer>() { + @Override + public ByteBuffer map(KeyValue input) { + // we have to make a copy of row, because the buffer may be changed after this call + return ByteBuffer.wrap(Arrays.copyOfRange( + input.getBuffer(), input.getRowOffset(), input.getRowOffset() + input.getRowLength())); + } + }; + + public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path) { + return scanHFiles(pipeline, path, new Scan()); + } + + /** + * Scans HFiles with filter conditions. + * + * @param pipeline the pipeline + * @param path path to HFiles + * @param scan filtering conditions + * @return {@code Result}s + * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan) + */ + public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan) { + // TODO(chaoshi): HFileInputFormat may skip some HFiles if their KVs do not fall into + // the range specified by this scan. + PCollection<KeyValue> in = pipeline.read(new HFileSource(ImmutableList.of(path), scan)); + return combineIntoRow(in, scan); + } + + public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs) { + return combineIntoRow(kvs, new Scan()); + } + + /** + * Converts a bunch of {@link KeyValue}s into {@link Result}. + * + * All {@code KeyValue}s belong to the same row are combined. Users may provide some filter + * conditions (specified by {@code scan}). Deletes are dropped and only a specified number + * of versions are kept. + * + * @param kvs the input {@code KeyValue}s + * @param scan filter conditions, currently we support start row, stop row and family map + * @return {@code Result}s + */ + public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs, Scan scan) { + if (!Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) { + kvs = kvs.filter(new StartRowFilterFn(scan.getStartRow())); + } + if (!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { + kvs = kvs.filter(new StopRowFilterFn(scan.getStopRow())); + } + if (scan.hasFamilies()) { + kvs = kvs.filter(new FamilyMapFilterFn(scan.getFamilyMap())); + } + TimeRange timeRange = scan.getTimeRange(); + if (timeRange != null && (timeRange.getMin() > 0 || timeRange.getMax() < Long.MAX_VALUE)) { + kvs = kvs.filter(new TimeRangeFilterFn(timeRange)); + } + // TODO(chaoshi): support Scan#getFilter + + PTable<ByteBuffer, KeyValue> kvsByRow = kvs.by(EXTRACT_ROW_FN, bytes()); + final int versions = scan.getMaxVersions(); + return kvsByRow.groupByKey().parallelDo("CombineKeyValueIntoRow", + new DoFn<Pair<ByteBuffer, Iterable<KeyValue>>, Result>() { + @Override + public void process(Pair<ByteBuffer, Iterable<KeyValue>> input, Emitter<Result> emitter) { + List<KeyValue> kvs = Lists.newArrayList(); + for (KeyValue kv : input.second()) { + kvs.add(kv.clone()); // assuming the input fits into memory + } + Result result = doCombineIntoRow(kvs, versions); + if (result == null) { + return; + } + emitter.emit(result); + } + }, writables(Result.class)); + } public static void writeToHFilesForIncrementalLoad( PCollection<KeyValue> kvs, @@ -178,4 +381,101 @@ public final class HFileUtils { } writer.close(); } + + private static Result doCombineIntoRow(List<KeyValue> kvs, int versions) { + // shortcut for the common case + if (kvs.isEmpty()) { + return null; + } + if (kvs.size() == 1 && kvs.get(0).getType() == KeyValue.Type.Put.getCode()) { + return new Result(kvs); + } + + kvs = maybeDeleteFamily(kvs); + + // In-place sort KeyValues by family, qualifier and then timestamp reversely. + Collections.sort(kvs, KeyValue.COMPARATOR); + + List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size()); + for (int i = 0, j; i < kvs.size(); i = j) { + j = i + 1; + while (j < kvs.size() && hasSameFamilyAndQualifier(kvs.get(i), kvs.get(j))) { + j++; + } + results.addAll(getLatestKeyValuesOfColumn(kvs.subList(i, j), versions)); + } + return new Result(results); + } + + /** + * In-place removes any {@link KeyValue}s whose timestamp is less than or equal to the + * delete family timestamp. Also removes the delete family {@code KeyValue}s. + */ + private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) { + long deleteFamilyCut = 0; + for (KeyValue kv : kvs) { + if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) { + deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp()); + } + } + if (deleteFamilyCut == 0) { + return kvs; + } + List<KeyValue> results = Lists.newArrayList(); + for (KeyValue kv : kvs) { + if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) { + continue; + } + if (kv.getTimestamp() <= deleteFamilyCut) { + continue; + } + results.add(kv); + } + return results; + } + + private static boolean hasSameFamilyAndQualifier(KeyValue l, KeyValue r) { + return Bytes.equals( + l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(), + r.getBuffer(), r.getFamilyOffset(), r.getFamilyOffset()) + && Bytes.equals( + l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(), + r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength()); + } + + /** + * Goes over the given {@link KeyValue}s and remove {@code Delete}s and {@code DeleteColumn}s. + * + * @param kvs {@code KeyValue}s that of same row and column and sorted by timestamps in + * descending order + * @param versions the number of versions to keep + * @return the resulting {@code KeyValue}s that contains only {@code Put}s + */ + private static List<KeyValue> getLatestKeyValuesOfColumn(List<KeyValue> kvs, int versions) { + if (kvs.isEmpty()) { + return kvs; + } + if (kvs.get(0).getType() == KeyValue.Type.Put.getCode()) { + return kvs; // shortcut for the common case + } + + List<KeyValue> results = Lists.newArrayListWithCapacity(versions); + long previousDeleteTimestamp = -1; + for (KeyValue kv : kvs) { + if (results.size() >= versions) { + break; + } + if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) { + break; + } else if (kv.getType() == KeyValue.Type.Put.getCode() + && kv.getTimestamp() != previousDeleteTimestamp) { + results.add(kv); + } else if (kv.getType() == KeyValue.Type.Delete.getCode()) { + previousDeleteTimestamp = kv.getTimestamp(); + } else { + throw new AssertionError("Unexpected KeyValue type: " + kv.getType()); + } + } + return results; + } }
