CRUNCH-619: Update to HBase 2.0.1. Contributed by Attila Sasvari.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1b2c058c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1b2c058c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1b2c058c Branch: refs/heads/master Commit: 1b2c058c4c2b084d895cf6ae86f68ac935282f2c Parents: ffca004 Author: Josh Wills <[email protected]> Authored: Mon Jul 23 13:31:00 2018 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jul 23 13:31:00 2018 -0700 ---------------------------------------------------------------------- crunch-examples/pom.xml | 7 +- .../crunch/examples/WordAggregationHBase.java | 33 ++--- crunch-hbase/pom.xml | 6 + .../apache/crunch/io/hbase/HFileSourceIT.java | 29 ++-- .../apache/crunch/io/hbase/HFileTargetIT.java | 99 ++++++++------ .../crunch/io/hbase/WordCountHBaseIT.java | 41 +++--- .../org/apache/crunch/io/hbase/HBaseData.java | 1 - .../crunch/io/hbase/HBaseSourceTarget.java | 6 +- .../org/apache/crunch/io/hbase/HBaseTypes.java | 3 +- .../crunch/io/hbase/HFileInputFormat.java | 15 ++- .../io/hbase/HFileOutputFormatForCrunch.java | 132 +++++++------------ .../crunch/io/hbase/HFileReaderFactory.java | 7 +- .../org/apache/crunch/io/hbase/HFileTarget.java | 18 +-- .../org/apache/crunch/io/hbase/HFileUtils.java | 84 ++++++------ .../apache/crunch/io/hbase/HTableIterable.java | 1 - .../apache/crunch/io/hbase/HTableIterator.java | 4 +- crunch-hcatalog/pom.xml | 53 +++++++- crunch-spark/pom.xml | 9 +- .../org/apache/crunch/SparkHFileTargetIT.java | 92 ++++++++----- .../apache/crunch/SparkWordCountHBaseIT.java | 26 ++-- pom.xml | 11 +- 21 files changed, 378 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-examples/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-examples/pom.xml b/crunch-examples/pom.xml index e28d553..adc6e9a 100644 --- a/crunch-examples/pom.xml +++ b/crunch-examples/pom.xml @@ -50,7 +50,6 @@ under the License. <scope>provided</scope> </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -66,6 +65,12 @@ under the License. <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-mapreduce</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java index 5d62d19..b128b7f 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java @@ -40,13 +40,14 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Tool; @@ -147,11 +148,13 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab * @throws IOException */ private static void putInHbase(List<Put> putList, Configuration conf) throws IOException { - HTable htable = new HTable(conf, TABLE_SOURCE); + Connection connection = ConnectionFactory.createConnection(conf); + Table htable = connection.getTable(TableName.valueOf(TABLE_SOURCE)); try { htable.put(putList); } finally { htable.close(); + connection.close(); } } @@ -161,16 +164,15 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab * @param conf the hbase configuration * @param htableName the table name * @param families the column family names - * @throws MasterNotRunningException - * @throws ZooKeeperConnectionException * @throws IOException */ - private static void createTable(Configuration conf, String htableName, String... families) throws MasterNotRunningException, ZooKeeperConnectionException, - IOException { - HBaseAdmin hbase = new HBaseAdmin(conf); + private static void createTable(Configuration conf, String htableName, String... families) throws IOException { + Connection connection = ConnectionFactory.createConnection(conf); + Admin hbase = connection.getAdmin(); try { - if (!hbase.tableExists(htableName)) { - HTableDescriptor desc = new HTableDescriptor(htableName); + TableName tableName = TableName.valueOf(htableName); + if (!hbase.tableExists(tableName)) { + HTableDescriptor desc = new HTableDescriptor(tableName); for (String s : families) { HColumnDescriptor meta = new HColumnDescriptor(s); desc.addFamily(meta); @@ -179,6 +181,7 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab } } finally { hbase.close(); + connection.close(); } } @@ -197,8 +200,8 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab } for (int i = 0; i < character.size(); i++) { Put put = new Put(Bytes.toBytes(character.get(i))); - put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY, Bytes.toBytes(play.get(i))); - put.add(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE, Bytes.toBytes(quote.get(i))); + put.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_PLAY, Bytes.toBytes(play.get(i))); + put.addColumn(COLUMN_FAMILY_SOURCE, COLUMN_QUALIFIER_SOURCE_QUOTE, Bytes.toBytes(quote.get(i))); list.add(put); } return list; @@ -238,7 +241,7 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab @Override public void process(Pair<String, String> input, Emitter<Put> emitter) { Put put = new Put(Bytes.toBytes(input.first())); - put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); + put.addColumn(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); emitter.emit(put); } }, HBaseTypes.puts()); http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml index 53b1199..075b197 100644 --- a/crunch-hbase/pom.xml +++ b/crunch-hbase/pom.xml @@ -88,6 +88,12 @@ under the License. <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-mapreduce</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java index 6f418a5..d485872 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java @@ -35,6 +35,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -126,9 +129,9 @@ public class HFileSourceIT implements Serializable { assertEquals(1, results.size()); Result result = Iterables.getOnlyElement(results); assertArrayEquals(ROW1, result.getRow()); - assertEquals(2, result.raw().length); - assertArrayEquals(VALUE1, result.getColumnLatest(FAMILY1, QUALIFIER1).getValue()); - assertArrayEquals(VALUE2, result.getColumnLatest(FAMILY1, QUALIFIER2).getValue()); + assertEquals(2, result.rawCells().length); + assertArrayEquals(VALUE1, CellUtil.cloneValue(result.getColumnLatestCell(FAMILY1, QUALIFIER1))); + assertArrayEquals(VALUE2, CellUtil.cloneValue(result.getColumnLatestCell(FAMILY1, QUALIFIER2))); } @Test @@ -142,11 +145,11 @@ public class HFileSourceIT implements Serializable { List<Result> results = doTestScanHFiles(kvs, scan); assertEquals(1, results.size()); Result result = Iterables.getOnlyElement(results); - List<KeyValue> kvs2 = result.getColumn(FAMILY1, QUALIFIER1); + List<Cell> kvs2 = result.getColumnCells(FAMILY1, QUALIFIER1); assertEquals(3, kvs2.size()); - assertArrayEquals(VALUE3, kvs2.get(0).getValue()); - assertArrayEquals(VALUE2, kvs2.get(1).getValue()); - assertArrayEquals(VALUE1, kvs2.get(2).getValue()); + assertArrayEquals(VALUE3, CellUtil.cloneValue(kvs2.get(0))); + assertArrayEquals(VALUE2, CellUtil.cloneValue(kvs2.get(1))); + assertArrayEquals(VALUE1, CellUtil.cloneValue(kvs2.get(2))); } @Test @@ -173,8 +176,8 @@ public class HFileSourceIT implements Serializable { scan.setStartRow(ROW1); List<Result> results = doTestScanHFiles(kvs, scan); assertEquals(2, results.size()); - assertArrayEquals(ROW2, kvs.get(0).getRow()); - assertArrayEquals(ROW3, kvs.get(1).getRow()); + assertArrayEquals(ROW2, results.get(0).getRow()); + assertArrayEquals(ROW3, results.get(1).getRow()); } //@Test @@ -214,8 +217,8 @@ public class HFileSourceIT implements Serializable { assertEquals(1, results.size()); Result result = Iterables.getOnlyElement(results); assertEquals(2, result.size()); - assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER1)); - assertNotNull(result.getColumnLatest(FAMILY2, QUALIFIER2)); + assertNotNull(result.getColumnLatestCell(FAMILY1, QUALIFIER1)); + assertNotNull(result.getColumnLatestCell(FAMILY2, QUALIFIER2)); } @Test @@ -230,7 +233,7 @@ public class HFileSourceIT implements Serializable { assertEquals(1, results.size()); Result result = Iterables.getOnlyElement(results); assertEquals(1, result.size()); - assertNotNull(result.getColumnLatest(FAMILY1, QUALIFIER2)); + assertNotNull(result.getColumnLatestCell(FAMILY1, QUALIFIER2)); } @Test @@ -326,7 +329,7 @@ public class HFileSourceIT implements Serializable { FileSystem fs = FileSystem.get(conf); w = HFile.getWriterFactory(conf, new CacheConfig(conf)) .withPath(fs, inputPath) - .withComparator(KeyValue.COMPARATOR) + .withComparator(CellComparatorImpl.COMPARATOR) .withFileContext(new HFileContext()) .create(); for (KeyValue kv : sortedKVs) { http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index 9027c1b..ffe2177 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -19,7 +19,6 @@ package org.apache.crunch.io.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Resources; import org.apache.commons.io.IOUtils; @@ -50,6 +49,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -57,11 +57,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -69,11 +72,11 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.KeyValueHeap; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; -import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; @@ -97,13 +100,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.crunch.types.writable.Writables.nulls; import static org.apache.crunch.types.writable.Writables.tableOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -152,21 +155,19 @@ public class HFileTargetIT implements Serializable { HBASE_TEST_UTILITY.startMiniCluster(1); } - private static HTable createTable(int splits) throws Exception { + private static Table createTable(int splits) throws Exception { HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); return createTable(splits, hcol); } - private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception { - byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000)); - HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin(); + private static Table createTable(int splits, HColumnDescriptor... hcols) throws Exception { + TableName tableName = TableName.valueOf(Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000))); HTableDescriptor htable = new HTableDescriptor(tableName); for (HColumnDescriptor hcol : hcols) { htable.addFamily(hcol); } - admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); - HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000); - return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName); + return HBASE_TEST_UTILITY.createTable(htable, + Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); } @AfterClass @@ -196,7 +197,7 @@ public class HFileTargetIT implements Serializable { FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration()); KeyValue kv = readFromHFiles(fs, outputPath, "and"); - assertEquals(375L, Bytes.toLong(kv.getValue())); + assertEquals(375L, Bytes.toLong(CellUtil.cloneValue(kv))); } @Test @@ -206,21 +207,25 @@ public class HFileTargetIT implements Serializable { Path outputPath = getTempPathOnHDFS("out"); byte[] columnFamilyA = Bytes.toBytes("colfamA"); byte[] columnFamilyB = Bytes.toBytes("colfamB"); - HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB)); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); + Table testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB)); + Connection connection = admin.getConnection(); + RegionLocator regionLocator = connection.getRegionLocator(testTable.getName()); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); PTable<String,Long> wordCounts = words.count(); PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB); HFileUtils.writePutsToHFilesForIncrementalLoad( wordCountPuts, - testTable, + connection, + testTable.getName(), outputPath); PipelineResult result = pipeline.run(); assertTrue(result.succeeded()); new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration()) - .doBulkLoad(outputPath, testTable); + .doBulkLoad(outputPath, admin, testTable, regionLocator); Map<String, Long> EXPECTED = ImmutableMap.<String, Long>builder() .put("__EMPTY__", 1345L) @@ -243,8 +248,12 @@ public class HFileTargetIT implements Serializable { Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath1 = getTempPathOnHDFS("out1"); Path outputPath2 = getTempPathOnHDFS("out2"); - HTable table1 = createTable(26); - HTable table2 = createTable(26); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); + Connection connection = admin.getConnection(); + Table table1 = createTable(26); + Table table2 = createTable(26); + RegionLocator regionLocator1 = connection.getRegionLocator(table1.getName()); + RegionLocator regionLocator2 = connection.getRegionLocator(table2.getName()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration()); boolean onlyAffectedRegions = true; @@ -256,19 +265,21 @@ public class HFileTargetIT implements Serializable { PTable<String, Long> longWordCounts = longWords.count(); HFileUtils.writePutsToHFilesForIncrementalLoad( convertToPuts(shortWordCounts), - table1, + connection, + table1.getName(), outputPath1); HFileUtils.writePutsToHFilesForIncrementalLoad( convertToPuts(longWordCounts), - table2, + connection, + table2.getName(), outputPath2, onlyAffectedRegions); PipelineResult result = pipeline.run(); assertTrue(result.succeeded()); - loader.doBulkLoad(outputPath1, table1); - loader.doBulkLoad(outputPath2, table2); + loader.doBulkLoad(outputPath1, admin, table1, regionLocator1); + loader.doBulkLoad(outputPath2, admin, table2, regionLocator2); assertEquals(314L, getWordCountFromTable(table1, "of")); assertEquals(375L, getWordCountFromTable(table2, "and")); @@ -282,10 +293,12 @@ public class HFileTargetIT implements Serializable { Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration()); Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath = getTempPathOnHDFS("out"); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); + Connection connection = admin.getConnection(); HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); hcol.setDataBlockEncoding(newBlockEncoding); hcol.setBloomFilterType(BloomType.ROWCOL); - HTable testTable = createTable(26, hcol); + Table testTable = createTable(26, hcol); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); @@ -293,7 +306,8 @@ public class HFileTargetIT implements Serializable { PCollection<Put> wordCountPuts = convertToPuts(wordCounts); HFileUtils.writePutsToHFilesForIncrementalLoad( wordCountPuts, - testTable, + connection, + testTable.getName(), outputPath); PipelineResult result = pipeline.run(); @@ -309,11 +323,11 @@ public class HFileTargetIT implements Serializable { } HFile.Reader reader = null; try { - reader = HFile.createReader(fs, f, new CacheConfig(conf), conf); + reader = HFile.createReader(fs, f, new CacheConfig(conf), true, conf); assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding()); BloomType bloomFilterType = BloomType.valueOf(Bytes.toString( - reader.loadFileInfo().get(StoreFile.BLOOM_FILTER_TYPE_KEY))); + reader.loadFileInfo().get(HStoreFile.BLOOM_FILTER_TYPE_KEY))); assertEquals(BloomType.ROWCOL, bloomFilterType); DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); assertNotNull(bloomMeta); @@ -337,7 +351,10 @@ public class HFileTargetIT implements Serializable { Pipeline pipeline = new MRPipeline(HFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration()); Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath1 = getTempPathOnHDFS("out1"); - HTable table1 = createTable(26); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); + Connection connection = admin.getConnection(); + Table table1 = createTable(26); + RegionLocator regionLocator1 = connection.getRegionLocator(table1.getName()); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); @@ -348,7 +365,8 @@ public class HFileTargetIT implements Serializable { PCollection<Put> wordPuts = convertToPuts(count); HFileUtils.writePutsToHFilesForIncrementalLoad( wordPuts, - table1, + connection, + table1.getName(), outputPath1, onlyAffectedRegions); @@ -393,7 +411,7 @@ public class HFileTargetIT implements Serializable { writtenPartitions.add((BytesWritable) wdc.deepCopy(next)); } - ImmutableList<byte[]> startKeys = ImmutableList.copyOf(table1.getStartKeys()); + ImmutableList<byte[]> startKeys = ImmutableList.copyOf(regionLocator1.getStartKeys()); // assert that only affected regions were loaded into assertTrue(startKeys.size() > writtenPartitions.size()); @@ -462,7 +480,7 @@ public class HFileTargetIT implements Serializable { long c = input.second(); Put p = new Put(Bytes.toBytes(w)); for (byte[] columnFamily : columnFamilies) { - p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c)); + p.addColumn(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c)); } return p; } @@ -479,7 +497,7 @@ public class HFileTargetIT implements Serializable { } long c = input.second(); Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c)); - return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null); + return Pair.of(KeyValueUtil.copyToNewKeyValue(cell), null); } }, tableOf(HBaseTypes.keyValues(), nulls())) .groupByKey(GroupingOptions.builder() @@ -503,28 +521,31 @@ public class HFileTargetIT implements Serializable { /** Reads the first value on a given row from a bunch of hfiles. */ private static KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException { List<KeyValueScanner> scanners = Lists.newArrayList(); - KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row)); + KeyValue fakeKV = KeyValueUtil.createFirstOnRow(Bytes.toBytes(row)); for (FileStatus e : fs.listStatus(mrOutputPath)) { Path f = e.getPath(); if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS" continue; } - StoreFile.Reader reader = new StoreFile.Reader( + StoreFileReader reader = new StoreFileReader( fs, f, new CacheConfig(fs.getConf()), + true, + new AtomicInteger(), + false, fs.getConf()); - StoreFileScanner scanner = reader.getStoreFileScanner(false, false); + StoreFileScanner scanner = reader.getStoreFileScanner(false, false, false, 0, 0, false); scanner.seek(fakeKV); // have to call seek of each underlying scanner, otherwise KeyValueHeap won't work scanners.add(scanner); } assertTrue(!scanners.isEmpty()); - KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); + KeyValueScanner kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); boolean seekOk = kvh.seek(fakeKV); assertTrue(seekOk); Cell kv = kvh.next(); kvh.close(); - return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of()); + return KeyValueUtil.copyToNewKeyValue(kv); } private static Path copyResourceFileToHDFS(String resourceName) throws IOException { @@ -551,11 +572,11 @@ public class HFileTargetIT implements Serializable { return result.makeQualified(fs); } - private static long getWordCountFromTable(HTable table, String word) throws IOException { + private static long getWordCountFromTable(Table table, String word) throws IOException { return getWordCountFromTable(table, TEST_FAMILY, word); } - private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word) throws IOException { + private static long getWordCountFromTable(Table table, byte[] columnFamily, String word) throws IOException { Get get = new Get(Bytes.toBytes(word)); get.addFamily(columnFamily); byte[] value = table.get(get).value(); http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index 4a06c0f..3de3a80 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -42,13 +42,14 @@ import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; @@ -101,7 +102,7 @@ public class WordCountHBaseIT { @Override public void process(Pair<String, Long> input, Emitter<Put> emitter) { Put put = new Put(Bytes.toBytes(input.first())); - put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second())); + put.addColumn(COUNTS_COLFAM, null, Bytes.toBytes(input.second())); emitter.emit(put); } @@ -123,9 +124,9 @@ public class WordCountHBaseIT { @Before public void setUp() throws Exception { Configuration conf = HBaseConfiguration.create(tmpDir.getDefaultConfiguration()); + conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, tmpDir.getFile("hbase-staging").getAbsolutePath()); hbaseTestUtil = new HBaseTestingUtility(conf); - hbaseTestUtil.startMiniZKCluster(); - hbaseTestUtil.startMiniHBaseCluster(1, 1); + hbaseTestUtil.startMiniCluster(); } @Test @@ -141,8 +142,7 @@ public class WordCountHBaseIT { @After public void tearDown() throws Exception { - hbaseTestUtil.shutdownMiniHBaseCluster(); - hbaseTestUtil.shutdownMiniZKCluster(); + hbaseTestUtil.shutdownMiniCluster(); } public void run(Pipeline pipeline) throws Exception { @@ -153,20 +153,20 @@ public class WordCountHBaseIT { Random rand = new Random(); int postFix = rand.nextInt() & 0x7FFFFFFF; - String inputTableName = "crunch_words_" + postFix; - String outputTableName = "crunch_counts_" + postFix; - String otherTableName = "crunch_other_" + postFix; - String joinTableName = "crunch_join_words_" + postFix; + TableName inputTableName = TableName.valueOf("crunch_words_" + postFix); + TableName outputTableName = TableName.valueOf("crunch_counts_" + postFix); + TableName otherTableName = TableName.valueOf("crunch_other_" + postFix); + TableName joinTableName = TableName.valueOf("crunch_join_words_" + postFix); - HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM); - HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM); - HTable otherTable = hbaseTestUtil.createTable(Bytes.toBytes(otherTableName), COUNTS_COLFAM); + Table inputTable = hbaseTestUtil.createTable(inputTableName, WORD_COLFAM); + Table outputTable = hbaseTestUtil.createTable(outputTableName, COUNTS_COLFAM); + Table otherTable = hbaseTestUtil.createTable(otherTableName, COUNTS_COLFAM); int key = 0; key = put(inputTable, key, "cat"); key = put(inputTable, key, "cat"); key = put(inputTable, key, "dog"); - inputTable.flushCommits(); + inputTable.close(); //Setup scan using multiple scans that simply cut the rows in half. Scan scan = new Scan(); @@ -179,7 +179,7 @@ public class WordCountHBaseIT { HBaseSourceTarget source = null; if(clazz == null){ - source = new HBaseSourceTarget(TableName.valueOf(inputTableName), scan, scan2); + source = new HBaseSourceTarget(inputTableName, scan, scan2); }else{ source = new HBaseSourceTarget(inputTableName, clazz, new Scan[]{scan, scan2}); } @@ -200,14 +200,13 @@ public class WordCountHBaseIT { assertIsLong(outputTable, "dog", 1); // verify we can do joins. - HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM); + Table joinTable = hbaseTestUtil.createTable(joinTableName, WORD_COLFAM); try { key = 0; key = put(joinTable, key, "zebra"); key = put(joinTable, key, "donkey"); key = put(joinTable, key, "bird"); key = put(joinTable, key, "horse"); - joinTable.flushCommits(); } finally { joinTable.close(); } @@ -233,14 +232,14 @@ public class WordCountHBaseIT { assertDeleted(outputTable, "dog"); } - protected int put(HTable table, int key, String value) throws IOException { + protected int put(Table table, int key, String value) throws IOException { Put put = new Put(Bytes.toBytes(key)); - put.add(WORD_COLFAM, null, Bytes.toBytes(value)); + put.addColumn(WORD_COLFAM, null, Bytes.toBytes(value)); table.put(put); return key + 1; } - protected static void assertIsLong(HTable table, String key, long i) throws IOException { + protected static void assertIsLong(Table table, String key, long i) throws IOException { Get get = new Get(Bytes.toBytes(key)); get.addFamily(COUNTS_COLFAM); Result result = table.get(get); @@ -250,7 +249,7 @@ public class WordCountHBaseIT { assertEquals(i, Bytes.toLong(rawCount)); } - protected static void assertDeleted(HTable table, String key) throws IOException { + protected static void assertDeleted(Table table, String key) throws IOException { Get get = new Get(Bytes.toBytes(key)); get.addFamily(COUNTS_COLFAM); Result result = table.get(get); http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java index 4ac6c8e..880ab68 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index ede7603..a8c157d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -19,6 +19,7 @@ package org.apache.crunch.io.hbase; import java.io.IOException; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.Pair; import org.apache.crunch.ReadableData; @@ -49,7 +50,6 @@ import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; @@ -176,11 +176,11 @@ public class HBaseSourceTarget extends HBaseTarget implements static String convertScanToString(Scan scan) throws IOException { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); - return Base64.encodeBytes(proto.toByteArray()); + return Base64.encodeBase64String(proto.toByteArray()); } public static Scan convertStringToScan(String string) throws IOException { - ClientProtos.Scan proto = ClientProtos.Scan.parseFrom(Base64.decode(string)); + ClientProtos.Scan proto = ClientProtos.Scan.parseFrom(Base64.decodeBase64(string)); return ProtobufUtil.toScan(proto); } http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java index 787b9c6..76a06c2 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTypes.java @@ -26,6 +26,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; @@ -103,7 +104,7 @@ public final class HBaseTypes { } public static BytesWritable keyValueToBytes(Cell input) { - return keyValueToBytes(KeyValue.cloneAndAddTags(input, ImmutableList.<Tag>of())); + return keyValueToBytes(KeyValueUtil.copyToNewKeyValue(input)); } public static BytesWritable keyValueToBytes(KeyValue kv) { http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java index b286f51..595e86d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileInputFormat.java @@ -25,7 +25,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -93,7 +96,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); LOG.info("Initialize HFileRecordReader for {}", path); - this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf); + this.in = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); // The file info must be loaded before the scanner can be used. // This seems like a bug in HBase, but it's easily worked around. @@ -129,8 +132,8 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { if(LOG.isInfoEnabled()) { LOG.info("Seeking to start row {}", Bytes.toStringBinary(startRow)); } - KeyValue kv = KeyValue.createFirstOnRow(startRow); - hasNext = seekAtOrAfter(scanner, kv); + Cell cell = PrivateCellUtil.createFirstOnRow(startRow, 0, (short) startRow.length); + hasNext = seekAtOrAfter(scanner, cell); } else { LOG.info("Seeking to start"); hasNext = scanner.seekTo(); @@ -142,7 +145,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { if (!hasNext) { return false; } - value = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of()); + value = KeyValueUtil.copyToNewKeyValue(scanner.getCell()); if (stopRow != null && Bytes.compareTo( value.getRowArray(), value.getRowOffset(), value.getRowLength(), @@ -185,7 +188,7 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { // This method is copied from o.a.h.hbase.regionserver.StoreFileScanner, as we don't want // to depend on it. - private static boolean seekAtOrAfter(HFileScanner s, KeyValue k) + private static boolean seekAtOrAfter(HFileScanner s, Cell k) throws IOException { int result = s.seekTo(k); if(result < 0) { @@ -233,4 +236,4 @@ public class HFileInputFormat extends FileInputFormat<NullWritable, KeyValue> { // This file isn't splittable. return false; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java index 50d5a0b..3cb3ce5 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -19,28 +19,25 @@ */ package org.apache.crunch.io.hbase; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.io.ByteStreams; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.RecordWriter; @@ -49,10 +46,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; -import java.net.InetSocketAddress; /** * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append} @@ -66,115 +60,83 @@ import java.net.InetSocketAddress; */ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> { + // HCOLUMN_DESCRIPTOR_KEY is no longer used, but left for binary compatibility public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor"; + public static final String HCOLUMN_DESCRIPTOR_COMPRESSION_TYPE_KEY = "hbase.hfileoutputformat.column.descriptor.compressiontype"; + public static final String HCOLUMN_DESCRIPTOR_DATA_BLOCK_ENCODING_KEY = "hbase.hfileoutputformat.column.descriptor.datablockencoding"; + public static final String HCOLUMN_DESCRIPTOR_BLOOM_FILTER_TYPE_KEY = "hbase.hfileoutputformat.column.descriptor.bloomfiltertype"; private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude"; private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormatForCrunch.class); private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); - private final TimeRangeTracker trt = new TimeRangeTracker(); @Override public RecordWriter<Object, Cell> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { Path outputPath = getDefaultWorkFile(context, ""); - final Configuration conf = context.getConfiguration(); - FileSystem fs = new HFileSystem(outputPath.getFileSystem(conf)); + Configuration conf = context.getConfiguration(); + FileSystem fs = outputPath.getFileSystem(conf); final boolean compactionExclude = conf.getBoolean( COMPACTION_EXCLUDE_CONF_KEY, false); - String hcolStr = conf.get(HCOLUMN_DESCRIPTOR_KEY); - if (hcolStr == null) { - throw new AssertionError(HCOLUMN_DESCRIPTOR_KEY + " is not set in conf"); - } - byte[] hcolBytes; - try { - hcolBytes = Hex.decodeHex(hcolStr.toCharArray()); - } catch (DecoderException e) { - throw new AssertionError("Bad hex string: " + hcolStr); - } - HColumnDescriptor hcol = new HColumnDescriptor(); - hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes))); LOG.info("Output path: {}", outputPath); - LOG.info("HColumnDescriptor: {}", hcol.toString()); Configuration noCacheConf = new Configuration(conf); noCacheConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - final StoreFile.WriterBuilder writerBuilder = new StoreFile.WriterBuilder(conf, new CacheConfig(noCacheConf), fs) - .withComparator(KeyValue.COMPARATOR) - .withFileContext(getContext(hcol)) + StoreFileWriter.Builder writerBuilder = + new StoreFileWriter.Builder(conf, new CacheConfig(noCacheConf), fs) + .withComparator(CellComparatorImpl.COMPARATOR) .withFilePath(outputPath) - .withBloomType(hcol.getBloomFilterType()); + .withFileContext(getContext(conf)); + String bloomFilterType = conf.get(HCOLUMN_DESCRIPTOR_BLOOM_FILTER_TYPE_KEY); + if (bloomFilterType != null) { + writerBuilder.withBloomType(BloomType.valueOf(bloomFilterType)); + } + final StoreFileWriter writer = writerBuilder.build(); return new RecordWriter<Object, Cell>() { - StoreFile.Writer writer = null; + long maxSeqId = 0L; @Override public void write(Object row, Cell cell) throws IOException { - - if (writer == null) { - writer = writerBuilder - .withFavoredNodes(getPreferredNodes(conf, cell)) - .build(); - } - - KeyValue copy = KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()); + KeyValue copy = KeyValueUtil.copyToNewKeyValue(cell); if (copy.getTimestamp() == HConstants.LATEST_TIMESTAMP) { copy.updateLatestStamp(now); } writer.append(copy); - trt.includeTimestamp(copy); + long seqId = cell.getSequenceId(); + if (seqId > maxSeqId) { + maxSeqId = seqId; + } } @Override public void close(TaskAttemptContext c) throws IOException { - if (writer != null) { - writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - writer.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); - writer.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - writer.appendFileInfo(StoreFile.TIMERANGE_KEY, - WritableUtils.toByteArray(trt)); - writer.close(); - } + // true => product of major compaction + writer.appendMetadata(maxSeqId, true); + writer.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + writer.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + writer.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + writer.close(); } }; } - /** - * Returns the "preferred" node for the given cell, or null if no preferred node can be found. The "preferred" - * node for a cell is defined as the host where the region server is located that is hosting the region that will - * contain the given cell. - */ - private InetSocketAddress[] getPreferredNodes(Configuration conf, Cell cell) throws IOException { - String regionLocationFilePathStr = conf.get(RegionLocationTable.REGION_LOCATION_TABLE_PATH); - if (regionLocationFilePathStr != null) { - LOG.debug("Reading region location file from {}", regionLocationFilePathStr); - Path regionLocationPath = new Path(regionLocationFilePathStr); - try (FSDataInputStream inputStream = regionLocationPath.getFileSystem(conf).open(regionLocationPath)) { - RegionLocationTable regionLocationTable = RegionLocationTable.deserialize(inputStream); - InetSocketAddress preferredNodeForRow = regionLocationTable.getPreferredNodeForRow(CellUtil.cloneRow(cell)); - if (preferredNodeForRow != null) { - return new InetSocketAddress[] { preferredNodeForRow }; - } else { - return null; - } - } - } else { - LOG.warn("No region location file path found in configuration"); - return null; + private HFileContext getContext(Configuration conf) { + HFileContextBuilder contextBuilder = new HFileContextBuilder(); + String compressionType = conf.get(HCOLUMN_DESCRIPTOR_COMPRESSION_TYPE_KEY); + if (compressionType != null) { + contextBuilder.withCompression(HFileWriterImpl.compressionByName(compressionType)); } - } - - private HFileContext getContext(HColumnDescriptor desc) { - HFileContext ctxt = new HFileContext(); - ctxt.setDataBlockEncoding(desc.getDataBlockEncoding()); - ctxt.setCompression(desc.getCompression()); - return ctxt; + String dataBlockEncoding = conf.get(HCOLUMN_DESCRIPTOR_DATA_BLOCK_ENCODING_KEY); + if (dataBlockEncoding != null) { + contextBuilder.withDataBlockEncoding(DataBlockEncoding.valueOf(dataBlockEncoding)); + } + return contextBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java index 14e6118..29af019 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -41,7 +42,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> { Configuration conf = fs.getConf(); CacheConfig cacheConfig = new CacheConfig(conf); try { - HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig, conf); + HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig, true, conf); HFileScanner scanner = hfr.getScanner( conf.getBoolean(HFILE_SCANNER_CACHE_BLOCKS, false), conf.getBoolean(HFILE_SCANNER_PREAD, false)); @@ -59,7 +60,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> { public HFileIterator(HFileScanner scanner) { this.scanner = scanner; - this.curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of()); + this.curr = KeyValueUtil.copyToNewKeyValue(scanner.getCell()); } @Override @@ -72,7 +73,7 @@ public class HFileReaderFactory implements FileReaderFactory<KeyValue> { KeyValue ret = curr; try { if (scanner.next()) { - curr = KeyValue.cloneAndAddTags(scanner.getKeyValue(), ImmutableList.<Tag>of()); + curr = KeyValueUtil.copyToNewKeyValue(scanner.getCell()); } else { curr = null; } http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java index 8593a76..b1ce5ba 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java @@ -17,8 +17,6 @@ */ package org.apache.crunch.io.hbase; -import com.google.common.base.Preconditions; -import org.apache.commons.codec.binary.Hex; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.Converter; @@ -31,26 +29,28 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; public class HFileTarget extends FileTargetImpl { - private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor(); - public HFileTarget(String path) { this(new Path(path)); } public HFileTarget(Path path) { - this(path, DEFAULT_COLUMN_DESCRIPTOR); + this(path, null); } public HFileTarget(Path path, HColumnDescriptor hcol) { super(path, HFileOutputFormatForCrunch.class, SequentialFileNamingScheme.getInstance()); - Preconditions.checkNotNull(hcol); - outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, - Hex.encodeHexString(WritableUtils.toByteArray(hcol))); + if (hcol != null) { + outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_COMPRESSION_TYPE_KEY, + hcol.getCompressionType().getName()); + outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_DATA_BLOCK_ENCODING_KEY, + hcol.getDataBlockEncoding().name()); + outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_BLOOM_FILTER_TYPE_KEY, + hcol.getBloomFilterType().name()); + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 0db536b..d85481d 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -59,12 +59,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; @@ -117,7 +119,7 @@ public final class HFileUtils { } private int compareType(KeyValue l, KeyValue r) { - return (int) r.getType() - (int) l.getType(); + return (int) r.getTypeByte() - (int) l.getTypeByte(); } }; @@ -272,9 +274,9 @@ public final class HFileUtils { Cell leftKey = new KeyValue(left, loffset + 8, llength - 8); Cell rightKey = new KeyValue(right, roffset + 8, rlength - 8); - byte[] lRow = leftKey.getRow(); - byte[] rRow = rightKey.getRow(); - int rowCmp = Bytes.compareTo(lRow, rRow); + int rowCmp = Bytes.compareTo( + leftKey.getRowArray(), leftKey.getRowOffset(), leftKey.getRowLength(), + rightKey.getRowArray(), rightKey.getRowOffset(), rightKey.getRowLength()); if (rowCmp != 0) { return rowCmp; } else { @@ -360,7 +362,7 @@ public final class HFileUtils { List<KeyValue> cells = Lists.newArrayList(); for (Cell kv : input.second()) { try { - cells.add(KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of())); // assuming the input fits into memory + cells.add(KeyValueUtil.copyToNewKeyValue(kv)); // assuming the input fits in memory } catch (Exception e) { throw new RuntimeException(e); } @@ -376,9 +378,10 @@ public final class HFileUtils { public static <C extends Cell> void writeToHFilesForIncrementalLoad( PCollection<C> cells, - HTable table, + Connection connection, + TableName tableName, Path outputPath) throws IOException { - writeToHFilesForIncrementalLoad(cells, table, outputPath, false); + writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, false); } /** @@ -392,18 +395,21 @@ public final class HFileUtils { */ public static <C extends Cell> void writeToHFilesForIncrementalLoad( PCollection<C> cells, - HTable table, + Connection connection, + TableName tableName, Path outputPath, boolean limitToAffectedRegions) throws IOException { + Table table = connection.getTable(tableName); + RegionLocator regionLocator = connection.getRegionLocator(tableName); HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); if (families.length == 0) { LOG.warn("{} has no column families", table); return; } - PCollection<C> partitioned = sortAndPartition(cells, table, limitToAffectedRegions); + PCollection<C> partitioned = sortAndPartition(cells, regionLocator, limitToAffectedRegions); RegionLocationTable regionLocationTable = RegionLocationTable.create( table.getName().getNameAsString(), - ((RegionLocator) table).getAllRegionLocations()); + regionLocator.getAllRegionLocations()); Path regionLocationFilePath = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "regionLocations" + table.getName().getNameAsString()); writeRegionLocationTable(cells.getPipeline().getConfiguration(), regionLocationFilePath, regionLocationTable); @@ -420,9 +426,10 @@ public final class HFileUtils { public static void writePutsToHFilesForIncrementalLoad( PCollection<Put> puts, - HTable table, + Connection connection, + TableName tableName, Path outputPath) throws IOException { - writePutsToHFilesForIncrementalLoad(puts, table, outputPath, false); + writePutsToHFilesForIncrementalLoad(puts, connection, tableName, outputPath, false); } /** @@ -436,7 +443,8 @@ public final class HFileUtils { */ public static void writePutsToHFilesForIncrementalLoad( PCollection<Put> puts, - HTable table, + Connection connection, + TableName tableName, Path outputPath, boolean limitToAffectedRegions) throws IOException { PCollection<Cell> cells = puts.parallelDo("ConvertPutToCells", new DoFn<Put, Cell>() { @@ -447,21 +455,21 @@ public final class HFileUtils { } } }, HBaseTypes.cells()); - writeToHFilesForIncrementalLoad(cells, table, outputPath, limitToAffectedRegions); + writeToHFilesForIncrementalLoad(cells, connection, tableName, outputPath, limitToAffectedRegions); } - public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table) throws IOException { - return sortAndPartition(cells, table, false); + public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator) throws IOException { + return sortAndPartition(cells, regionLocator, false); } /** - * Sorts and partitions the provided <code>cells</code> for the given <code>table</code> to ensure all elements that belong + * Sorts and partitions the provided <code>cells</code> for the given <code>regionLocator</code> to ensure all elements that belong * in the same region end up in the same reducer. The flag <code>limitToAffectedRegions</code>, when set to true, will identify * the regions the data in <code>cells</code> belongs to and will set the number of reducers equal to the number of identified * affected regions. If set to false, then all regions will be used, and the number of reducers will be set to the number * of regions in the table. */ - public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, HTable table, boolean limitToAffectedRegions) throws IOException { + public static <C extends Cell> PCollection<C> sortAndPartition(PCollection<C> cells, RegionLocator regionLocator, boolean limitToAffectedRegions) throws IOException { Configuration conf = cells.getPipeline().getConfiguration(); PTable<C, Void> t = cells.parallelDo( "Pre-partition", @@ -474,9 +482,9 @@ public final class HFileUtils { List<KeyValue> splitPoints; if(limitToAffectedRegions) { - splitPoints = getSplitPoints(table, t); + splitPoints = getSplitPoints(regionLocator, t); } else { - splitPoints = getSplitPoints(table); + splitPoints = getSplitPoints(regionLocator); } Path partitionFile = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), "partition"); writePartitionInfo(conf, partitionFile, splitPoints); @@ -489,10 +497,10 @@ public final class HFileUtils { return t.groupByKey(options).ungroup().keys(); } - private static List<KeyValue> getSplitPoints(HTable table) throws IOException { - List<byte[]> startKeys = ImmutableList.copyOf(table.getStartKeys()); + private static List<KeyValue> getSplitPoints(RegionLocator regionLocator) throws IOException { + List<byte[]> startKeys = ImmutableList.copyOf(regionLocator.getStartKeys()); if (startKeys.isEmpty()) { - throw new AssertionError(table + " has no regions!"); + throw new AssertionError(regionLocator.getName().getNameAsString() + " has no regions!"); } List<KeyValue> splitPoints = Lists.newArrayList(); for (byte[] startKey : startKeys.subList(1, startKeys.size())) { @@ -503,12 +511,12 @@ public final class HFileUtils { return splitPoints; } - private static <C> List<KeyValue> getSplitPoints(HTable table, PTable<C, Void> affectedRows) throws IOException { + private static <C> List<KeyValue> getSplitPoints(RegionLocator regionLocator, PTable<C, Void> affectedRows) throws IOException { List<byte[]> startKeys; try { - startKeys = Lists.newArrayList(table.getStartKeys()); + startKeys = Lists.newArrayList(regionLocator.getStartKeys()); if (startKeys.isEmpty()) { - throw new AssertionError(table + " has no regions!"); + throw new AssertionError(regionLocator.getName().getNameAsString() + " has no regions!"); } } catch (IOException e) { throw new CrunchRuntimeException(e); @@ -604,8 +612,8 @@ public final class HFileUtils { if (kvs.isEmpty()) { return null; } - if (kvs.size() == 1 && kvs.get(0).getType() == KeyValue.Type.Put.getCode()) { - return new Result(kvs); + if (kvs.size() == 1 && kvs.get(0).getTypeByte() == KeyValue.Type.Put.getCode()) { + return Result.create(Collections.<Cell>singletonList(kvs.get(0))); } kvs = maybeDeleteFamily(kvs); @@ -613,7 +621,7 @@ public final class HFileUtils { // In-place sort KeyValues by family, qualifier and then timestamp reversely (whenever ties, deletes appear first). Collections.sort(kvs, KEY_VALUE_COMPARATOR); - List<KeyValue> results = Lists.newArrayListWithCapacity(kvs.size()); + List<Cell> results = Lists.newArrayListWithCapacity(kvs.size()); for (int i = 0, j; i < kvs.size(); i = j) { j = i + 1; while (j < kvs.size() && hasSameFamilyAndQualifier(kvs.get(i), kvs.get(j))) { @@ -624,7 +632,7 @@ public final class HFileUtils { if (results.isEmpty()) { return null; } - return new Result(results); + return Result.create(results); } /** @@ -634,7 +642,7 @@ public final class HFileUtils { private static List<KeyValue> maybeDeleteFamily(List<KeyValue> kvs) { long deleteFamilyCut = -1; for (KeyValue kv : kvs) { - if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) { + if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { deleteFamilyCut = Math.max(deleteFamilyCut, kv.getTimestamp()); } } @@ -643,7 +651,7 @@ public final class HFileUtils { } List<KeyValue> results = Lists.newArrayList(); for (KeyValue kv : kvs) { - if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) { + if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { continue; } if (kv.getTimestamp() <= deleteFamilyCut) { @@ -675,7 +683,7 @@ public final class HFileUtils { if (kvs.isEmpty()) { return kvs; } - if (kvs.get(0).getType() == KeyValue.Type.Put.getCode()) { + if (kvs.get(0).getTypeByte() == KeyValue.Type.Put.getCode()) { return kvs; // shortcut for the common case } @@ -685,16 +693,16 @@ public final class HFileUtils { if (results.size() >= versions) { break; } - if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) { + if (kv.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { break; - } else if (kv.getType() == KeyValue.Type.Put.getCode()) { + } else if (kv.getTypeByte() == KeyValue.Type.Put.getCode()) { if (kv.getTimestamp() != previousDeleteTimestamp) { results.add(kv); } - } else if (kv.getType() == KeyValue.Type.Delete.getCode()) { + } else if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { previousDeleteTimestamp = kv.getTimestamp(); } else { - throw new AssertionError("Unexpected KeyValue type: " + kv.getType()); + throw new AssertionError("Unexpected KeyValue type: " + kv.getTypeByte()); } } return results; http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java index c772515..0657a01 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java @@ -21,7 +21,6 @@ package org.apache.crunch.io.hbase; import org.apache.crunch.Pair; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java index ebef5d3..647eea4 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java @@ -71,12 +71,12 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { try { table.close(); } catch (IOException e) { - LOG.error("Exception closing HTable: {}", table.getName(), e); + LOG.error("Exception closing Table: {}", table.getName(), e); } try { connection.close(); } catch (IOException e) { - LOG.error("Exception closing HTable: {}", table.getName(), e); + LOG.error("Exception closing Table: {}", table.getName(), e); } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/pom.xml b/crunch-hcatalog/pom.xml index e99814b..59ebe45 100644 --- a/crunch-hcatalog/pom.xml +++ b/crunch-hcatalog/pom.xml @@ -37,11 +37,51 @@ under the License. <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> <!-- declare the exclusion here --> + <groupId>org.eclipse.jetty.aggregate</groupId> + <artifactId>jetty-all</artifactId> + </exclusion> + <exclusion> <!-- declare the exclusion here --> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> <!-- declare the exclusion here --> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -125,8 +165,7 @@ under the License. <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-hbase-handler</artifactId> - <version>${hive.version}</version> - <scope>test</scope> + <version>3.0.0</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-spark/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml index 233bb34..34189dc 100644 --- a/crunch-spark/pom.xml +++ b/crunch-spark/pom.xml @@ -51,8 +51,8 @@ under the License. <scope>provided</scope> <exclusions> <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> </exclusion> <exclusion> <groupId>com.sun.jersey</groupId> @@ -95,6 +95,11 @@ under the License. <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-mapreduce</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/crunch/blob/1b2c058c/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java index 815aaff..ab0b061 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkHFileTargetIT.java @@ -36,24 +36,29 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.KeyValueHeap; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -72,6 +77,7 @@ import java.nio.charset.Charset; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.crunch.types.writable.Writables.nulls; import static org.apache.crunch.types.writable.Writables.tableOf; @@ -125,21 +131,19 @@ public class SparkHFileTargetIT implements Serializable { HBASE_TEST_UTILITY.startMiniCluster(1); } - private static HTable createTable(int splits) throws Exception { + private static Table createTable(int splits) throws Exception { HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); return createTable(splits, hcol); } - private static HTable createTable(int splits, HColumnDescriptor... hcols) throws Exception { - byte[] tableName = Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000)); - HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin(); + private static Table createTable(int splits, HColumnDescriptor... hcols) throws Exception { + TableName tableName = TableName.valueOf(Bytes.toBytes("test_table_" + RANDOM.nextInt(1000000000))); HTableDescriptor htable = new HTableDescriptor(tableName); for (HColumnDescriptor hcol : hcols) { htable.addFamily(hcol); } - admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); - HBASE_TEST_UTILITY.waitTableAvailable(tableName, 30000); - return new HTable(HBASE_TEST_UTILITY.getConfiguration(), tableName); + return HBASE_TEST_UTILITY.createTable(htable, + Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits)); } @AfterClass @@ -170,7 +174,7 @@ public class SparkHFileTargetIT implements Serializable { FileSystem fs = FileSystem.get(HBASE_TEST_UTILITY.getConfiguration()); KeyValue kv = readFromHFiles(fs, outputPath, "and"); - assertEquals(375L, Bytes.toLong(kv.getValue())); + assertEquals(375L, Bytes.toLong(CellUtil.cloneValue(kv))); pipeline.done(); } @@ -182,21 +186,25 @@ public class SparkHFileTargetIT implements Serializable { Path outputPath = getTempPathOnHDFS("out"); byte[] columnFamilyA = Bytes.toBytes("colfamA"); byte[] columnFamilyB = Bytes.toBytes("colfamB"); - HTable testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB)); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); + Table testTable = createTable(26, new HColumnDescriptor(columnFamilyA), new HColumnDescriptor(columnFamilyB)); + Connection connection = admin.getConnection(); + RegionLocator regionLocator = connection.getRegionLocator(testTable.getName()); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); PTable<String,Long> wordCounts = words.count(); PCollection<Put> wordCountPuts = convertToPuts(wordCounts, columnFamilyA, columnFamilyB); HFileUtils.writePutsToHFilesForIncrementalLoad( wordCountPuts, - testTable, + admin.getConnection(), + testTable.getName(), outputPath); PipelineResult result = pipeline.run(); assertTrue(result.succeeded()); new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration()) - .doBulkLoad(outputPath, testTable); + .doBulkLoad(outputPath, admin, testTable, regionLocator); Map<String, Long> EXPECTED = ImmutableMap.<String, Long>builder() .put("__EMPTY__", 1345L) @@ -221,8 +229,12 @@ public class SparkHFileTargetIT implements Serializable { Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath1 = getTempPathOnHDFS("out1"); Path outputPath2 = getTempPathOnHDFS("out2"); - HTable table1 = createTable(26); - HTable table2 = createTable(26); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); + Table table1 = createTable(26); + Table table2 = createTable(26); + Connection connection = admin.getConnection(); + RegionLocator regionLocator1 = connection.getRegionLocator(table1.getName()); + RegionLocator regionLocator2 = connection.getRegionLocator(table2.getName()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration()); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); @@ -233,18 +245,20 @@ public class SparkHFileTargetIT implements Serializable { PTable<String, Long> longWordCounts = longWords.count(); HFileUtils.writePutsToHFilesForIncrementalLoad( convertToPuts(shortWordCounts), - table1, + connection, + table1.getName(), outputPath1); HFileUtils.writePutsToHFilesForIncrementalLoad( convertToPuts(longWordCounts), - table2, + connection, + table1.getName(), outputPath2); PipelineResult result = pipeline.run(); assertTrue(result.succeeded()); - loader.doBulkLoad(outputPath1, table1); - loader.doBulkLoad(outputPath2, table2); + loader.doBulkLoad(outputPath1, admin, table1, regionLocator1); + loader.doBulkLoad(outputPath2, admin, table2, regionLocator2); assertEquals(314L, getWordCountFromTable(table1, "of")); assertEquals(375L, getWordCountFromTable(table2, "and")); @@ -260,9 +274,11 @@ public class SparkHFileTargetIT implements Serializable { SparkHFileTargetIT.class, HBASE_TEST_UTILITY.getConfiguration()); Path inputPath = copyResourceFileToHDFS("shakes.txt"); Path outputPath = getTempPathOnHDFS("out"); + Admin admin = HBASE_TEST_UTILITY.getAdmin(); HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY); hcol.setDataBlockEncoding(newBlockEncoding); - HTable testTable = createTable(26, hcol); + Table testTable = createTable(26, hcol); + Connection connection = admin.getConnection(); PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings())); PCollection<String> words = split(shakespeare, "\\s+"); @@ -270,7 +286,8 @@ public class SparkHFileTargetIT implements Serializable { PCollection<Put> wordCountPuts = convertToPuts(wordCounts); HFileUtils.writePutsToHFilesForIncrementalLoad( wordCountPuts, - testTable, + connection, + testTable.getName(), outputPath); PipelineResult result = pipeline.run(); @@ -286,7 +303,7 @@ public class SparkHFileTargetIT implements Serializable { } HFile.Reader reader = null; try { - reader = HFile.createReader(fs, f, new CacheConfig(conf), conf); + reader = HFile.createReader(fs, f, new CacheConfig(conf), true, conf); assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding()); } finally { if (reader != null) { @@ -314,7 +331,7 @@ public class SparkHFileTargetIT implements Serializable { long c = input.second(); Put p = new Put(Bytes.toBytes(w)); for (byte[] columnFamily : columnFamilies) { - p.add(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c)); + p.addColumn(columnFamily, TEST_QUALIFIER, Bytes.toBytes(c)); } return p; } @@ -331,7 +348,7 @@ public class SparkHFileTargetIT implements Serializable { } long c = input.second(); Cell cell = CellUtil.createCell(Bytes.toBytes(w), Bytes.toBytes(c)); - return Pair.of(KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()), null); + return Pair.of(KeyValueUtil.copyToNewKeyValue(cell), null); } }, tableOf(HBaseTypes.keyValues(), nulls())) .groupByKey(GroupingOptions.builder() @@ -355,28 +372,31 @@ public class SparkHFileTargetIT implements Serializable { /** Reads the first value on a given row from a bunch of hfiles. */ private static KeyValue readFromHFiles(FileSystem fs, Path mrOutputPath, String row) throws IOException { List<KeyValueScanner> scanners = Lists.newArrayList(); - KeyValue fakeKV = KeyValue.createFirstOnRow(Bytes.toBytes(row)); + KeyValue fakeKV = KeyValueUtil.createFirstOnRow(Bytes.toBytes(row)); for (FileStatus e : fs.listStatus(mrOutputPath)) { Path f = e.getPath(); if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS" continue; } - StoreFile.Reader reader = new StoreFile.Reader( - fs, - f, - new CacheConfig(fs.getConf()), - fs.getConf()); - StoreFileScanner scanner = reader.getStoreFileScanner(false, false); + StoreFileReader reader = new StoreFileReader( + fs, + f, + new CacheConfig(fs.getConf()), + true, + new AtomicInteger(), + false, + fs.getConf()); + StoreFileScanner scanner = reader.getStoreFileScanner(false, false, false, 0, 0, false); scanner.seek(fakeKV); // have to call seek of each underlying scanner, otherwise KeyValueHeap won't work scanners.add(scanner); } assertTrue(!scanners.isEmpty()); - KeyValueScanner kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); + KeyValueScanner kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); boolean seekOk = kvh.seek(fakeKV); assertTrue(seekOk); Cell kv = kvh.next(); kvh.close(); - return KeyValue.cloneAndAddTags(kv, ImmutableList.<Tag>of()); + return KeyValueUtil.copyToNewKeyValue(kv); } private static Path copyResourceFileToHDFS(String resourceName) throws IOException { @@ -403,11 +423,11 @@ public class SparkHFileTargetIT implements Serializable { return result.makeQualified(fs); } - private static long getWordCountFromTable(HTable table, String word) throws IOException { + private static long getWordCountFromTable(Table table, String word) throws IOException { return getWordCountFromTable(table, TEST_FAMILY, word); } - private static long getWordCountFromTable(HTable table, byte[] columnFamily, String word) throws IOException { + private static long getWordCountFromTable(Table table, byte[] columnFamily, String word) throws IOException { Get get = new Get(Bytes.toBytes(word)); get.addFamily(columnFamily); byte[] value = table.get(get).value();
