Repository: crunch Updated Branches: refs/heads/master 49e457559 -> c09c4ee2d
CRUNCH-608 Write Bloom filters in HFiles Use a correctly-configured StoreFile.Writer (instead of HFile.Writer) for writing HFiles so that Bloom filter data is also included in the written HFiles. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c09c4ee2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c09c4ee2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c09c4ee2 Branch: refs/heads/master Commit: c09c4ee2de992b50c15d2cb91a3e6e22c88fb0b1 Parents: 49e4575 Author: Gabriel Reid <[email protected]> Authored: Tue May 10 11:02:11 2016 +0200 Committer: Gabriel Reid <[email protected]> Committed: Tue May 10 11:02:11 2016 +0200 ---------------------------------------------------------------------- .../org/apache/crunch/io/hbase/HFileTargetIT.java | 16 ++++++++++++++++ .../crunch/io/hbase/HFileOutputFormatForCrunch.java | 11 ++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/c09c4ee2/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index c78ae75..af24865 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -66,10 +66,14 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.KeyValueHeap; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.BloomFilter; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; @@ -81,6 +85,7 @@ import org.junit.Rule; import org.junit.Test; import java.io.BufferedReader; +import java.io.DataInput; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -96,7 +101,9 @@ import java.util.Random; import static org.apache.crunch.types.writable.Writables.nulls; import static org.apache.crunch.types.writable.Writables.tableOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -277,6 +284,7 @@ public class HFileTargetIT implements Serializable { Path outputPath = getTempPathOnHDFS("out"); HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); hcol.setDataBlockEncoding(newBlockEncoding); + hcol.setBloomFilterType(BloomType.ROWCOL); HTable testTable = createTable(26, hcol); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); @@ -303,6 +311,14 @@ public class HFileTargetIT implements Serializable { try { reader = HFile.createReader(fs, f, new CacheConfig(conf), conf); assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding()); + + BloomType bloomFilterType = BloomType.valueOf(Bytes.toString( + reader.loadFileInfo().get(StoreFile.BLOOM_FILTER_TYPE_KEY))); + assertEquals(BloomType.ROWCOL, bloomFilterType); + DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); + assertNotNull(bloomMeta); + BloomFilter bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader); + assertNotNull(bloomFilter); } finally { if (reader != null) { reader.close(); http://git-wip-us.apache.org/repos/asf/crunch/blob/c09c4ee2/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java index 7611235..0b6ae2f 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; @@ -89,11 +91,14 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> { hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes))); LOG.info("Output path: {}", outputPath); LOG.info("HColumnDescriptor: {}", hcol.toString()); - final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf) - .withPath(fs, outputPath) + Configuration noCacheConf = new Configuration(conf); + noCacheConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + final StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(noCacheConf), fs) .withComparator(KeyValue.COMPARATOR) .withFileContext(getContext(hcol)) - .create(); + .withFilePath(outputPath) + .withBloomType(hcol.getBloomFilterType()) + .build(); return new RecordWriter<Object, Cell>() { @Override
