Repository: crunch Updated Branches: refs/heads/master b6accf4e3 -> de1553e73
CRUNCH-545 Use a single job for writing to HFiles Filter Cells on column family after they have been sorted and partitioned. This ensures that the partitioning and writing to HFiles will all occur in a single job, instead of one job per column family that is defined in the table. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b2e6d169 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b2e6d169 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b2e6d169 Branch: refs/heads/master Commit: b2e6d169b3492d9a04c8a76ca771f586bbaa760d Parents: b6accf4 Author: Gabriel Reid <[email protected]> Authored: Sun Jul 19 16:12:52 2015 +0200 Committer: Gabriel Reid <[email protected]> Committed: Mon Jul 20 15:12:11 2015 +0200 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/HFileTargetIT.java | 30 ++++++++++++++------ .../org/apache/crunch/io/hbase/HFileUtils.java | 25 ++++++++++------ 2 files changed, 39 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b2e6d169/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index ddb1292..5bcc51c 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -137,11 +137,13 @@ public class HFileTargetIT implements Serializable { return createTable(splits, hcol); } - private static HTable createTable(int splits, HColumnDescriptor hcol) throws Exception { + private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception { byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000)); HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin(); HTableDescriptor htable = new HTableDescriptor(tableName); - htable.addFamily(hcol); + for (HColumnDescriptor hcol : hcols) { + htable.addFamily(hcol); + } admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000); return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName); @@ -182,12 +184,13 @@ public class HFileTargetIT implements Serializable { Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration()); Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath = getTempPathOnHDFS("out"); - HTable testTable = createTable(26); - + byte[] columnFamilyA = Bytes.toBytes("colfamA"); + byte[] columnFamilyB = Bytes.toBytes("colfamB"); + HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB)); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); PTable<String,Long> wordCounts = words.count(); - PCollection<Put> wordCountPuts = convertToPuts(wordCounts); + PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB); HFileUtils.writePutsToHFilesForIncrementalLoad( wordCountPuts, testTable, @@ -208,8 +211,8 @@ public class HFileTargetIT implements Serializable { .build(); for (Map.Entry<String, Long> e : EXPECTED.entrySet()) { - long actual = getWordCountFromTable(testTable, e.getKey()); - assertEquals((long) e.getValue(), actual); + assertEquals((long) e.getValue(), getWordCountFromTable(testTable, columnFamilyA, e.getKey())); + assertEquals((long) e.getValue(), getWordCountFromTable(testTable, columnFamilyB, e.getKey())); } } @@ -296,6 +299,10 @@ public class HFileTargetIT implements Serializable { } private static PCollection<Put> convertToPuts(PTable<String, Long> in) { + return convertToPuts(in, TEST_FAMILY); + } + + private static PCollection<Put> convertToPuts(PTable<String, Long> in, final byte[]...columnFamilies) { return in.parallelDo(new MapFn<Pair<String, Long>, Put>() { @Override public Put map(Pair<String, Long> input) { @@ -305,7 +312,9 @@ public class HFileTargetIT implements Serializable { } long c = input.second(); Put p = new Put(Bytes.toBytes(w)); - p.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c)); + for (byte[] columnFamily : columnFamilies) { + p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c)); + } return p; } }, HBaseTypes.puts()); @@ -394,7 +403,12 @@ public class HFileTargetIT implements Serializable { } private static long getWordCountFromTable(HTable table, String word) throws IOException { + return getWordCountFromTable(table, TEST_FAMILY, word); + } + + private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word) throws IOException { Get get = new Get(Bytes.toBytes(word)); + get.addFamily(columnFamily); byte[] value = table.get(get).value(); if (value == null) { fail("no such row: " + word); http://git-wip-us.apache.org/repos/asf/crunch/blob/b2e6d169/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 34118ca..d18b65b 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -128,6 +128,11 @@ public final class HFileUtils { public boolean accept(C input) { return Bytes.equals(CellUtil.cloneFamily(input), family); } + + @Override + public boolean disableDeepCopy() { + return true; + } } private static class StartRowFilterFn<C extends Cell> extends FilterFn<C> { @@ -367,10 +372,12 @@ public final class HFileUtils { LOG.warn("{} has no column families", table); return; } + PCollection<C> partitioned = sortAndPartition(cells, table); for (HColumnDescriptor f : families) { byte[] family = f.getName(); - PCollection<C> sorted = sortAndPartition(cells.filter(new FilterByFamilyFn<C>(family)), table); - sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)); + partitioned + .filter(new FilterByFamilyFn<C>(family)) + .write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)); } } @@ -391,12 +398,14 @@ public final class HFileUtils { public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table) throws IOException { Configuration conf = cells.getPipeline().getConfiguration(); - PTable<C, Void> t = cells.parallelDo(new MapFn<C, Pair<C, Void>>() { - @Override - public Pair<C, Void> map(C input) { - return Pair.of(input, (Void) null); - } - }, tableOf(cells.getPType(), nulls())); + PTable<C, Void> t = cells.parallelDo( + "Pre-partition", + new MapFn<C, Pair<C, Void>>() { + @Override + public Pair<C, Void> map(C input) { + return Pair.of(input, (Void) null); + } + }, tableOf(cells.getPType(), nulls())); List<KeyValue> splitPoints = getSplitPoints(table); Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "partition"); writePartitionInfo(conf, partitionFile, splitPoints);
