Updated Branches: refs/heads/master 1eae50c1a -> 6b994e3bb
CRUNCH-255: HFileOutputFormatForCrunch should use configuration from table for compression, block encoding, block size... Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6b994e3b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6b994e3b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6b994e3b Branch: refs/heads/master Commit: 6b994e3bb484235b9d8e62cdf13cf924c993d3fc Parents: 1eae50c Author: Chao Shi <[email protected]> Authored: Wed Aug 21 17:56:11 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Thu Aug 22 16:17:11 2013 +0800 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/HFileTargetIT.java | 84 +++++++++++++++++--- .../io/hbase/HFileOutputFormatForCrunch.java | 68 +++++++--------- .../org/apache/crunch/io/hbase/HFileTarget.java | 44 +++++++++- .../org/apache/crunch/io/hbase/HFileUtils.java | 22 ++++- 4 files changed, 167 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index 667b5ad..aac9e317 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -47,8 +47,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.KeyValueHeap; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -107,9 +109,13 @@ public class HFileTargetIT implements Serializable { } private static HTable createTable(int splits) throws IOException { + HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); + return createTable(splits, hcol); + } + + private static HTable createTable(int splits, HColumnDescriptor hcol) throws IOException { byte[] tableName = Bytes.toBytes("test_table_" + tableCounter); HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin(); - HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); HTableDescriptor htable = new HTableDescriptor(tableName); htable.addFamily(hcol); admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); @@ -166,8 +172,8 @@ public class HFileTargetIT implements Serializable { PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); PTable<String,Long> wordCounts = words.count(); - PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts); - pipeline.write(wordCountKvs, ToHBase.hfile(outputPath)); + PCollection<KeyValue> wordCountKeyValues = convertToKeyValues(wordCounts); + pipeline.write(wordCountKeyValues, ToHBase.hfile(outputPath)); PipelineResult result = pipeline.run(); assertTrue(result.succeeded()); @@ -188,9 +194,9 @@ public class HFileTargetIT implements Serializable { PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); PTable<String,Long> wordCounts = words.count(); - PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts); - HFileUtils.writeToHFilesForIncrementalLoad( - wordCountKvs, + PCollection<Put> wordCountPuts = convertToPuts(wordCounts); + HFileUtils.writePutsToHFilesForIncrementalLoad( + wordCountPuts, testTable, outputPath); @@ -231,12 +237,12 @@ public class HFileTargetIT implements Serializable { PCollection<String> longWords = words.filter(FilterFns.not(SHORT_WORD_FILTER)); PTable<String, Long> shortWordCounts = shortWords.count(); PTable<String, Long> longWordCounts = longWords.count(); - HFileUtils.writeToHFilesForIncrementalLoad( - convertToKeyValues(shortWordCounts), + HFileUtils.writePutsToHFilesForIncrementalLoad( + convertToPuts(shortWordCounts), table1, outputPath1); - HFileUtils.writeToHFilesForIncrementalLoad( - convertToKeyValues(longWordCounts), + HFileUtils.writePutsToHFilesForIncrementalLoad( + convertToPuts(longWordCounts), table2, outputPath2); @@ -245,11 +251,67 @@ public class HFileTargetIT implements Serializable { loader.doBulkLoad(outputPath1, table1); loader.doBulkLoad(outputPath2, table2); - FileSystem fs = FileSystem.get(conf); assertEquals(396L, getWordCountFromTable(table1, "of")); assertEquals(427L, getWordCountFromTable(table2, "and")); } + @Test + public void testHFileUsesFamilyConfig() throws IOException { + DataBlockEncoding newBlockEncoding = DataBlockEncoding.PREFIX; + assertTrue(newBlockEncoding != DataBlockEncoding.valueOf(HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING)); + + Configuration conf = HBASE_TEST_UTILITY.getConfiguration(); + Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf); + Path inputPath = copyResourceFileToHDFS("shakes.txt"); + Path outputPath = getTempPathOnHDFS("out"); + HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); + hcol.setDataBlockEncoding(newBlockEncoding); + HTable testTable = createTable(10, hcol); + + PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); + PCollection<String> words = split(shakespeare, "\\s+"); + PTable<String,Long> wordCounts = words.count(); + PCollection<Put> wordCountPuts = convertToPuts(wordCounts); + HFileUtils.writePutsToHFilesForIncrementalLoad( + wordCountPuts, + testTable, + outputPath); + + PipelineResult result = pipeline.run(); + assertTrue(result.succeeded()); + + int hfilesCount = 0; + FileSystem fs = outputPath.getFileSystem(conf); + for (FileStatus e : fs.listStatus(new Path(outputPath, Bytes.toString(TEST_FAMILY)))) { + Path f = e.getPath(); + if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS" + continue; + } + HFile.Reader reader = null; + try { + reader = HFile.createReader(fs, f, new CacheConfig(conf)); + assertEquals(DataBlockEncoding.PREFIX, reader.getEncodingOnDisk()); + } finally { + reader.close(); + } + hfilesCount++; + } + assertTrue(hfilesCount > 0); + } + + private PCollection<Put> convertToPuts(PTable<String, Long> in) { + return in.parallelDo(new MapFn<Pair<String, Long>, Put>() { + @Override + public Put map(Pair<String, Long> input) { + String w = input.first(); + long c = input.second(); + Put p = new Put(Bytes.toBytes(w)); + p.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c)); + return p; + } + }, Writables.writables(Put.class)); + } + private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) { return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() { @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java index 311d91c..70f10d5 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -19,17 +19,18 @@ */ package org.apache.crunch.io.hbase; +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; -import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; @@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; /** @@ -53,13 +56,10 @@ import java.io.IOException; */ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValue> { - private static final String COMPACTION_EXCLUDE_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.compaction.exclude"; - private static final String DATABLOCK_ENCODING_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.datablock.encoding"; - private static final String BLOCK_SIZE_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.blocksize"; - private static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; + public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor"; + private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude"; + private static final Log LOG = LogFactory.getLog(HFileOutputFormatForCrunch.class); + private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); private final TimeRangeTracker trt = new TimeRangeTracker(); @@ -68,20 +68,30 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu Path outputPath = getDefaultWorkFile(context, ""); Configuration conf = context.getConfiguration(); FileSystem fs = outputPath.getFileSystem(conf); - int blocksize = conf.getInt(BLOCK_SIZE_CONF_KEY, - HFile.DEFAULT_BLOCKSIZE); - String compression = conf.get( - COMPRESSION_CONF_KEY, Compression.Algorithm.NONE.getName()); + final boolean compactionExclude = conf.getBoolean( COMPACTION_EXCLUDE_CONF_KEY, false); - HFileDataBlockEncoder encoder = getDataBlockEncoder( - conf.get(DATABLOCK_ENCODING_CONF_KEY)); + + String hcolStr = conf.get(HCOLUMN_DESCRIPTOR_KEY); + if (hcolStr == null) { + throw new AssertionError(HCOLUMN_DESCRIPTOR_KEY + " is not set in conf"); + } + byte[] hcolBytes; + try { + hcolBytes = Hex.decodeHex(hcolStr.toCharArray()); + } catch (DecoderException e) { + throw new AssertionError("Bad hex string: " + hcolStr); + } + HColumnDescriptor hcol = new HColumnDescriptor(); + hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes))); + LOG.info("Output path: " + outputPath); + LOG.info("HColumnDescriptor: " + hcol.toString()); final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf) .withPath(fs, outputPath) - .withBlockSize(blocksize) - .withCompression(compression) + .withBlockSize(hcol.getBlocksize()) + .withCompression(hcol.getCompression()) .withComparator(KeyValue.KEY_COMPARATOR) - .withDataBlockEncoder(encoder) + .withDataBlockEncoder(new HFileDataBlockEncoderImpl(hcol.getDataBlockEncoding())) .withChecksumType(Store.getChecksumType(conf)) .withBytesPerChecksum(Store.getBytesPerChecksum(conf)) .create(); @@ -112,22 +122,4 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu } }; } - - private HFileDataBlockEncoder getDataBlockEncoder(String dataBlockEncodingStr) { - final HFileDataBlockEncoder encoder; - if (dataBlockEncodingStr == null) { - encoder = NoOpDataBlockEncoder.INSTANCE; - } else { - try { - encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding - .valueOf(dataBlockEncodingStr)); - } catch (IllegalArgumentException ex) { - throw new RuntimeException( - "Invalid data block encoding type configured for the param " - + DATABLOCK_ENCODING_CONF_KEY + " : " - + dataBlockEncodingStr); - } - } - return encoder; - } } http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java index 0038394..bc51b2c 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java @@ -17,20 +17,62 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.base.Preconditions; +import org.apache.commons.codec.binary.Hex; +import org.apache.crunch.io.CrunchOutputs; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.crunch.types.PType; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class HFileTarget extends FileTargetImpl { - // TODO(chaoshi): configurable compression algorithm, block size, data block encoder for hfile... + private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor(); + private final HColumnDescriptor hcol; public HFileTarget(String path) { this(new Path(path)); } public HFileTarget(Path path) { + this(path, DEFAULT_COLUMN_DESCRIPTOR); + } + + public HFileTarget(Path path, HColumnDescriptor hcol) { super(path, HFileOutputFormatForCrunch.class, new SequentialFileNamingScheme()); + this.hcol = Preconditions.checkNotNull(hcol); + } + + @Override + protected void configureForMapReduce( + Job job, + Class keyClass, + Class valueClass, + Class outputFormatClass, + Path outputPath, + String name) { + try { + FileOutputFormat.setOutputPath(job, outputPath); + } catch (Exception e) { + throw new RuntimeException(e); + } + + String hcolStr = Hex.encodeHexString(WritableUtils.toByteArray(hcol)); + if (name == null) { + job.setOutputFormatClass(HFileOutputFormatForCrunch.class); + job.setOutputKeyClass(keyClass); + job.setOutputValueClass(valueClass); + job.getConfiguration().set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr); + } else { + FormatBundle<HFileOutputFormatForCrunch> bundle = FormatBundle.forOutput(HFileOutputFormatForCrunch.class); + bundle.set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr); + CrunchOutputs.addNamedOutput(job, name, bundle, keyClass, valueClass); + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 5e07a67..d026555 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.sun.org.apache.commons.logging.Log; import com.sun.org.apache.commons.logging.LogFactory; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; import org.apache.crunch.MapFn; @@ -34,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; @@ -104,10 +107,27 @@ public final class HFileUtils { byte[] family = f.getName(); PCollection<KeyValue> sorted = sortAndPartition( kvs.filter(new FilterByFamilyFn(family)), table); - sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)))); + sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)); } } + public static void writePutsToHFilesForIncrementalLoad( + PCollection<Put> puts, + HTable table, + Path outputPath) throws IOException { + PCollection<KeyValue> kvs = puts.parallelDo("ConvertPutToKeyValue", new DoFn<Put, KeyValue>() { + @Override + public void process(Put input, Emitter<KeyValue> emitter) { + for (List<KeyValue> keyValues : input.getFamilyMap().values()) { + for (KeyValue keyValue : keyValues) { + emitter.emit(keyValue); + } + } + } + }, writables(KeyValue.class)); + writeToHFilesForIncrementalLoad(kvs, table, outputPath); + } + public static PCollection<KeyValue> sortAndPartition(PCollection<KeyValue> kvs, HTable table) throws IOException { Configuration conf = kvs.getPipeline().getConfiguration(); PTable<KeyValue, Void> t = kvs.parallelDo(new MapFn<KeyValue, Pair<KeyValue, Void>>() {
