Updated Branches: refs/heads/master c10996a4d -> 891e28d0e
CRUNCH-251: Fix HFileUtils#sortAndPartition does not work when two instances exist in the same pipeline Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/891e28d0 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/891e28d0 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/891e28d0 Branch: refs/heads/master Commit: 891e28d0e0e9cd8da00bed28b33d4e448697b171 Parents: c10996a Author: Chao Shi <[email protected]> Authored: Wed Aug 21 10:45:57 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Wed Aug 21 10:45:57 2013 +0800 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/HFileTargetIT.java | 85 ++++++++++++++++---- .../org/apache/crunch/io/hbase/HFileUtils.java | 2 +- 2 files changed, 71 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/891e28d0/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index 8c1b3f4..667b5ad 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -23,12 +23,14 @@ import com.google.common.io.Resources; import org.apache.commons.io.IOUtils; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.FilterFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; +import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; import org.apache.crunch.test.TemporaryPath; @@ -76,10 +78,17 @@ import static org.junit.Assert.fail; public class HFileTargetIT implements Serializable { private static final HBaseTestingUtility HBASE_TEST_UTILITY = new HBaseTestingUtility(); - private static final byte[] TEST_TABLE = Bytes.toBytes("test_table"); private static final byte[] TEST_FAMILY = Bytes.toBytes("test_family"); private static final byte[] TEST_QUALIFIER = Bytes.toBytes("count"); private static final Path TEMP_DIR = new Path("/tmp"); + private static int tableCounter = 0; + + private static FilterFn<String> SHORT_WORD_FILTER = new FilterFn<String>() { + @Override + public boolean accept(String input) { + return input.length() <= 2; + } + }; @Rule public transient TemporaryPath tmpDir = TemporaryPaths.create(); @@ -90,16 +99,6 @@ public class HFileTargetIT implements Serializable { // (we will need it to test bulk load against multiple regions). HBASE_TEST_UTILITY.startMiniCluster(); HBASE_TEST_UTILITY.startMiniMapReduceCluster(); - HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin(); - HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); - HTableDescriptor htable = new HTableDescriptor(TEST_TABLE); - htable.addFamily(hcol); - byte[][] splits = new byte[26][]; - for (int i = 0; i < 26; i++) { - byte b = (byte)('a' + i); - splits[i] = new byte[] { b }; - } - admin.createTable(htable, splits); // Set classpath for yarn, otherwise it won't be able to find MRAppMaster // (see CRUNCH-249 and HBASE-8528). @@ -107,6 +106,17 @@ public class HFileTargetIT implements Serializable { dirtyFixForJobHistoryServerAddress(); } + private static HTable createTable(int splits) throws IOException { + byte[] tableName = Bytes.toBytes("test_table_" + tableCounter); + HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin(); + HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); + HTableDescriptor htable = new HTableDescriptor(tableName); + htable.addFamily(hcol); + admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); + tableCounter++; + return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName); + } + /** * We need to set the address of JobHistory server, as it randomly picks a unused port * to listen. Unfortunately, HBaseTestingUtility neither does that nor provides a way @@ -144,7 +154,6 @@ public class HFileTargetIT implements Serializable { public void setUp() throws IOException { FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration()); fs.delete(TEMP_DIR, true); - HBASE_TEST_UTILITY.truncateTable(TEST_TABLE); } @Test @@ -174,12 +183,12 @@ public class HFileTargetIT implements Serializable { Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf); Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath = getTempPathOnHDFS("out"); + HTable testTable = createTable(26); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); PTable<String,Long> wordCounts = words.count(); PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts); - HTable testTable = new HTable(HBASE_TEST_UTILITY.getConfiguration(), TEST_TABLE); HFileUtils.writeToHFilesForIncrementalLoad( wordCountKvs, testTable, @@ -199,11 +208,48 @@ public class HFileTargetIT implements Serializable { .put("to", 367L) .build(); for (Map.Entry<String, Long> e : EXPECTED.entrySet()) { - assertEquals((long) e.getValue(), Bytes.toLong( - testTable.get(new Get(Bytes.toBytes(e.getKey()))).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER).getValue())); + long actual = getWordCountFromTable(testTable, e.getKey()); + assertEquals((long) e.getValue(), actual); } } + /** @seealso CRUNCH-251 */ + @Test + public void testMultipleHFileTargets() throws Exception { + Configuration conf = HBASE_TEST_UTILITY.getConfiguration(); + Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf); + Path inputPath = copyResourceFileToHDFS("shakes.txt"); + Path outputPath1 = getTempPathOnHDFS("out1"); + Path outputPath2 = getTempPathOnHDFS("out2"); + HTable table1 = createTable(10); + HTable table2 = createTable(20); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration()); + + PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); + PCollection<String> words = split(shakespeare, "\\s+"); + PCollection<String> shortWords = words.filter(SHORT_WORD_FILTER); + PCollection<String> longWords = words.filter(FilterFns.not(SHORT_WORD_FILTER)); + PTable<String, Long> shortWordCounts = shortWords.count(); + PTable<String, Long> longWordCounts = longWords.count(); + HFileUtils.writeToHFilesForIncrementalLoad( + convertToKeyValues(shortWordCounts), + table1, + outputPath1); + HFileUtils.writeToHFilesForIncrementalLoad( + convertToKeyValues(longWordCounts), + table2, + outputPath2); + + PipelineResult result = pipeline.run(); + assertTrue(result.succeeded()); + loader.doBulkLoad(outputPath1, table1); + loader.doBulkLoad(outputPath2, table2); + + FileSystem fs = FileSystem.get(conf); + assertEquals(396L, getWordCountFromTable(table1, "of")); + assertEquals(427L, getWordCountFromTable(table2, "and")); + } + private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) { return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() { @Override @@ -281,4 +327,13 @@ public class HFileTargetIT implements Serializable { Path result = new Path(TEMP_DIR, fileName); return result.makeQualified(fs); } + + private long getWordCountFromTable(HTable table, String word) throws IOException { + Get get = new Get(Bytes.toBytes(word)); + KeyValue keyValue = table.get(get).getColumnLatest(TEST_FAMILY, TEST_QUALIFIER); + if (keyValue == null) { + fail("no such row: " + word); + } + return Bytes.toLong(keyValue.getValue()); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/891e28d0/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 2235538..5e07a67 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -119,9 +119,9 @@ public final class HFileUtils { List <KeyValue> splitPoints = getSplitPoints(table); Path partitionFile = new Path(((MRPipeline) kvs.getPipeline()).createTempPath(), "partition"); writePartitionInfo(conf, partitionFile, splitPoints); - TotalOrderPartitioner.setPartitionFile(conf, partitionFile); GroupingOptions options = GroupingOptions.builder() .partitionerClass(TotalOrderPartitioner.class) + .conf(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString()) .numReducers(splitPoints.size() + 1) .sortComparatorClass(KeyValueComparator.class) .build();
