Updated Branches: refs/heads/master 995f4d985 -> 013f2e19a
CRUNCH-267: Fix several HFileUtils#scanHFiles related problems Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/013f2e19 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/013f2e19 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/013f2e19 Branch: refs/heads/master Commit: 013f2e19a7d666caca352cb3874e407e682a2436 Parents: 995f4d9 Author: Chao Shi <[email protected]> Authored: Fri Sep 20 11:29:32 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Fri Sep 20 11:29:32 2013 +0800 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/HFileSourceIT.java | 73 +++++++++++++++++++ .../crunch/io/hbase/HFileInputFormat.java | 45 +++++++----- .../io/hbase/HFileOutputFormatForCrunch.java | 4 +- .../org/apache/crunch/io/hbase/HFileSource.java | 38 ++++++++++ .../org/apache/crunch/io/hbase/HFileUtils.java | 75 +++++++++++++++++--- 5 files changed, 205 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java index 9363aba..f45bbf9 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java @@ -53,6 +53,7 @@ import static org.apache.crunch.types.writable.Writables.strings; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class HFileSourceIT implements Serializable { @@ -81,6 +82,7 @@ public class HFileSourceIT implements Serializable { conf = tmpDir.getDefaultConfiguration(); } + @Test public void testHFileSource() throws IOException { List<KeyValue> kvs = generateKeyValues(100); Path inputPath = tmpDir.getPath("in"); @@ -160,6 +162,42 @@ public class HFileSourceIT implements Serializable { } @Test + public void testScanHFiles_startRowIsTooSmall() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW3, FAMILY1, QUALIFIER1, 0, VALUE1)); + Scan scan = new Scan(); + scan.setStartRow(ROW1); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(2, results.size()); + assertArrayEquals(ROW2, kvs.get(0).getRow()); + assertArrayEquals(ROW3, kvs.get(1).getRow()); + } + + @Test + public void testScanHFiles_startRowIsTooLarge() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW2, FAMILY1, QUALIFIER1, 0, VALUE1)); + Scan scan = new Scan(); + scan.setStartRow(ROW3); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(0, results.size()); + } + + @Test + public void testScanHFiles_startRowDoesNotExist() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), + new KeyValue(ROW3, FAMILY3, QUALIFIER3, 0, VALUE3)); + Scan scan = new Scan(); + scan.setStartRow(ROW2); + List<Result> results = doTestScanHFiles(kvs, scan); + assertEquals(1, results.size()); + assertArrayEquals(ROW3, results.get(0).getRow()); + } + + @Test public void testScanHFiles_familyMap() throws IOException { List<KeyValue> kvs = ImmutableList.of( new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1), @@ -192,6 +230,41 @@ public class HFileSourceIT implements Serializable { assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2)); } + @Test + public void testScanHFiles_delete() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.Delete)); + List<Result> results = doTestScanHFiles(kvs, new Scan()); + assertEquals(1, results.size()); + assertArrayEquals(VALUE1, results.get(0).getValue(FAMILY1, QUALIFIER1)); + } + + @Test + public void testScanHFiles_deleteColumn() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, VALUE2), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.DeleteColumn)); + List<Result> results = doTestScanHFiles(kvs, new Scan()); + assertEquals(0, results.size()); + } + + @Test + public void testScanHFiles_deleteFamily() throws IOException { + List<KeyValue> kvs = ImmutableList.of( + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 1, VALUE1), + new KeyValue(ROW1, FAMILY1, QUALIFIER2, 2, VALUE2), + new KeyValue(ROW1, FAMILY1, QUALIFIER3, 3, VALUE3), + new KeyValue(ROW1, FAMILY1, QUALIFIER1, 2, KeyValue.Type.DeleteFamily)); + List<Result> results = doTestScanHFiles(kvs, new Scan()); + assertEquals(1, results.size()); + assertNull(results.get(0).getValue(FAMILY1, QUALIFIER1)); + assertNull(results.get(0).getValue(FAMILY1, QUALIFIER2)); + assertArrayEquals(VALUE3, results.get(0).getValue(FAMILY1, QUALIFIER3)); + } + private List<Result> doTestScanHFiles(List<KeyValue> kvs, Scan scan) throws IOException { Path inputPath = tmpDir.getPath("in"); writeKeyValuesToHFile(inputPath, kvs); http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java index 07b4b15..9ced0ac 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java @@ -17,10 +17,10 @@ */ package org.apache.crunch.io.hbase; -import com.sun.org.apache.commons.logging.Log; -import com.sun.org.apache.commons.logging.LogFactory; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileReaderV2; import org.apache.hadoop.hbase.io.hfile.HFileScanner; @@ -62,7 +63,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { * a more general purpose utility; it accounts for the presence of metadata files created * in the way we're doing exports. */ - private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { public boolean accept(Path p) { String name = p.getName(); return !name.startsWith("_") && !name.startsWith("."); @@ -86,6 +87,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { private byte[] stopRow = null; private boolean reachedStopRow = false; private long count; + private boolean seeked = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { @@ -93,16 +95,8 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); - - long fileSize = fs.getFileStatus(path).getLen(); - - // Open the underlying input stream; it will be closed by the HFileReader below. - FSDataInputStream iStream = fs.open(path); - FixedFileTrailer fileTrailer = FixedFileTrailer.readFromStream(iStream, fileSize); - - // If this class is generalized, it may need to account for different data block encodings. - this.in = new HFileReaderV2(path, fileTrailer, iStream, iStream, fileSize, true, new CacheConfig(conf), - DataBlockEncoding.NONE, new HFileSystem(fs)); + LOG.info("Initialize HFileRecordReader for " + path); + this.in = HFile.createReader(fs, path, new CacheConfig(conf)); // The file info must be loaded before the scanner can be used. // This seems like a bug in HBase, but it's easily worked around. @@ -133,15 +127,16 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { return false; } boolean hasNext; - if (!scanner.isSeeked()) { + if (!seeked) { if (startRow != null) { LOG.info("Seeking to start row " + Bytes.toStringBinary(startRow)); KeyValue kv = KeyValue.createFirstOnRow(startRow); - hasNext = (scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()) >= 0); + hasNext = seekAtOrAfter(scanner, kv); } else { LOG.info("Seeking to start"); hasNext = scanner.seekTo(); } + seeked = true; } else { hasNext = scanner.next(); } @@ -182,18 +177,34 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { public void close() throws IOException { in.close(); } + + // This method is copied from o.a.h.hbase.regionserver.StoreFileScanner, as we don't want + // to depend on it. + private static boolean seekAtOrAfter(HFileScanner s, KeyValue k) + throws IOException { + int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength()); + if(result < 0) { + // Passed KV is smaller than first KV in file, work from start of file + return s.seekTo(); + } else if(result > 0) { + // Passed KV is larger than current KV in file, if there is a next + // it is the "after", if not then this scanner is done. + return s.next(); + } + // Seeked to the exact key + return true; + } } @Override protected List<FileStatus> listStatus(JobContext job) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); - FileSystem fs = FileSystem.get(job.getConfiguration()); - // Explode out directories that match the original FileInputFormat filters since HFiles are written to directories where the // directory name is the column name for (FileStatus status : super.listStatus(job)) { if (status.isDir()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { result.add(match); } http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java index 70f10d5..e7bca2b 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -19,10 +19,10 @@ */ package org.apache.crunch.io.hbase; -import com.sun.org.apache.commons.logging.Log; -import com.sun.org.apache.commons.logging.LogFactory; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java index c1d15a2..31d314d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java @@ -20,11 +20,16 @@ package org.apache.crunch.io.hbase; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.SourceTargetHelper; import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -37,6 +42,7 @@ import static org.apache.crunch.types.writable.Writables.writables; public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSource<KeyValue> { + private static final Log LOG = LogFactory.getLog(HFileSource.class); private static final PType<KeyValue> KEY_VALUE_PTYPE = writables(KeyValue.class); public HFileSource(Path path) { @@ -79,4 +85,36 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSou public String toString() { return "HFile(" + pathsAsString() + ")"; } + + @Override + public long getSize(Configuration conf) { + // HFiles are stored into <family>/<file>, but the default implementation does not support this. + // This is used for estimating the number of reducers. (Otherwise we will always get 1 reducer.) + long sum = 0; + for (Path path : getPaths()) { + try { + sum += getSizeInternal(conf, path); + } catch (IOException e) { + LOG.warn("Failed to estimate size of " + path); + } + } + return sum; + } + + private long getSizeInternal(Configuration conf, Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + FileStatus[] statuses = fs.listStatus(path, HFileInputFormat.HIDDEN_FILE_FILTER); + if (statuses == null) { + return 0; + } + long sum = 0; + for (FileStatus status : statuses) { + if (status.isDir()) { + sum += SourceTargetHelper.getPathSize(fs, status.getPath()); + } else { + sum += status.getLen(); + } + } + return sum; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/013f2e19/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index e2f1520..2428c16 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -20,6 +20,7 @@ package org.apache.crunch.io.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.DoFn; @@ -53,6 +54,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -67,6 +69,51 @@ public final class HFileUtils { private static final Log LOG = LogFactory.getLog(HFileUtils.class); + /** Compares {@code KeyValue} by its family, qualifier, timestamp (reversely), type (reversely) and memstoreTS. */ + private static final Comparator<KeyValue> KEY_VALUE_COMPARATOR = new Comparator<KeyValue>() { + @Override + public int compare(KeyValue l, KeyValue r) { + int cmp; + if ((cmp = compareFamily(l, r)) != 0) { + return cmp; + } + if ((cmp = compareQualifier(l, r)) != 0) { + return cmp; + } + if ((cmp = compareTimestamp(l, r)) != 0) { + return cmp; + } + if ((cmp = compareType(l, r)) != 0) { + return cmp; + } + return compareMemstoreTS(l, r); + } + + private int compareFamily(KeyValue l, KeyValue r) { + return Bytes.compareTo( + l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(), + r.getBuffer(), r.getFamilyOffset(), r.getFamilyLength()); + } + + private int compareQualifier(KeyValue l, KeyValue r) { + return Bytes.compareTo( + l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(), + r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength()); + } + + private int compareTimestamp(KeyValue l, KeyValue r) { + return -Longs.compare(l.getTimestamp(), r.getTimestamp()); + } + + private int compareType(KeyValue l, KeyValue r) { + return (int) r.getType() - (int) l.getType(); + } + + private int compareMemstoreTS(KeyValue l, KeyValue r) { + return Longs.compare(l.getMemstoreTS(), r.getMemstoreTS()); + } + }; + private static class FilterByFamilyFn extends FilterFn<KeyValue> { private final byte[] family; @@ -241,10 +288,12 @@ public final class HFileUtils { * @see #combineIntoRow(org.apache.crunch.PCollection, org.apache.hadoop.hbase.client.Scan) */ public static PCollection<Result> scanHFiles(Pipeline pipeline, Path path, Scan scan) { - // TODO(chaoshi): HFileInputFormat may skip some HFiles if their KVs do not fall into - // the range specified by this scan. - PCollection<KeyValue> in = pipeline.read(new HFileSource(ImmutableList.of(path), scan)); - return combineIntoRow(in, scan); + return scanHFiles(pipeline, ImmutableList.of(path), scan); + } + + public static PCollection<Result> scanHFiles(Pipeline pipeline, List<Path> paths, Scan scan) { + PCollection<KeyValue> in = pipeline.read(new HFileSource(paths, scan)); + return combineIntoRow(in, scan); } public static PCollection<Result> combineIntoRow(PCollection<KeyValue> kvs) { @@ -393,8 +442,8 @@ public final class HFileUtils { kvs = maybeDeleteFamily(kvs); - // In-place sort KeyValues by family, qualifier and then timestamp reversely. - Collections.sort(kvs, KeyValue.COMPARATOR); + // In-place sort KeyValues by family, qualifier and then timestamp reversely (whenever ties, deletes appear first). + Collections.sort(kvs, KEY_VALUE_COMPARATOR); List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size()); for (int i = 0, j; i < kvs.size(); i = j) { @@ -404,6 +453,9 @@ public final class HFileUtils { } results.addAll(getLatestKeyValuesOfColumn(kvs.subList(i, j), versions)); } + if (results.isEmpty()) { + return null; + } return new Result(results); } @@ -412,7 +464,7 @@ public final class HFileUtils { * delete family timestamp. Also removes the delete family {@code KeyValue}s. */ private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) { - long deleteFamilyCut = 0; + long deleteFamilyCut = -1; for (KeyValue kv : kvs) { if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) { deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp()); @@ -437,7 +489,7 @@ public final class HFileUtils { private static boolean hasSameFamilyAndQualifier(KeyValue l, KeyValue r) { return Bytes.equals( l.getBuffer(), l.getFamilyOffset(), l.getFamilyLength(), - r.getBuffer(), r.getFamilyOffset(), r.getFamilyOffset()) + r.getBuffer(), r.getFamilyOffset(), r.getFamilyLength()) && Bytes.equals( l.getBuffer(), l.getQualifierOffset(), l.getQualifierLength(), r.getBuffer(), r.getQualifierOffset(), r.getQualifierLength()); @@ -467,9 +519,10 @@ public final class HFileUtils { } if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) { break; - } else if (kv.getType() == KeyValue.Type.Put.getCode() - && kv.getTimestamp() != previousDeleteTimestamp) { - results.add(kv); + } else if (kv.getType() == KeyValue.Type.Put.getCode()) { + if (kv.getTimestamp() != previousDeleteTimestamp) { + results.add(kv); + } } else if (kv.getType() == KeyValue.Type.Delete.getCode()) { previousDeleteTimestamp = kv.getTimestamp(); } else {
