This is an automated email from the ASF dual-hosted git repository.
jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new 8a1c8b4 CRUNCH-688: Fix HFile node affinity for non-default namespace
HBase tables
new 67e2890 Merge pull request #27 from noslowerdna/CRUNCH-688
8a1c8b4 is described below
commit 8a1c8b451fff5b516296ad9a8f3624087cff7924
Author: Andrew Olson <[email protected]>
AuthorDate: Fri Aug 2 16:47:09 2019 -0500
CRUNCH-688: Fix HFile node affinity for non-default namespace HBase tables
---
.../org/apache/crunch/io/hbase/HFileTargetIT.java | 45 +++++++++++++++++++---
.../crunch/io/hbase/RegionLocationTableTest.java | 2 +-
.../org/apache/crunch/io/hbase/HFileUtils.java | 2 +-
3 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index ffe2177..0b606a2 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Admin;
@@ -113,6 +115,7 @@ import static org.junit.Assert.fail;
public class HFileTargetIT implements Serializable {
private static HBaseTestingUtility HBASE_TEST_UTILITY;
+ private static final String TEST_NAMESPACE = "test_namespace";
private static final byte[] TEST_FAMILY = Bytes.toBytes("test_family");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("count");
private static final Path TEMP_DIR = new Path("/tmp");
@@ -156,16 +159,37 @@ public class HFileTargetIT implements Serializable {
}
private static Table createTable(int splits) throws Exception {
- HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
- return createTable(splits, hcol);
+ return createTable(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, splits);
}
private static Table createTable(int splits, HColumnDescriptor... hcols)
throws Exception {
- TableName tableName = TableName.valueOf(Bytes.toBytes("test_table_" +
RANDOM.nextInt(1000000000)));
+ return createTable(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, splits,
hcols);
+ }
+
+ private static Table createTable(String namespace, int splits) throws
Exception {
+ return createTable(namespace, splits, new HColumnDescriptor(TEST_FAMILY));
+ }
+
+ private static Table createTable(String namespace, int splits,
HColumnDescriptor... hcols) throws Exception {
+ TableName tableName;
+ if (NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR.equals(namespace)) {
+ tableName = TableName.valueOf(Bytes.toBytes("test_table_" +
RANDOM.nextInt(1000000000)));
+ } else {
+ tableName = TableName.valueOf(Bytes.toBytes(namespace +
TableName.NAMESPACE_DELIM +
+ "test_table_" + RANDOM.nextInt(1000000000)));
+ }
HTableDescriptor htable = new HTableDescriptor(tableName);
for (HColumnDescriptor hcol : hcols) {
htable.addFamily(hcol);
}
+
+ if (!NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR.equals(namespace)) {
+ try {
+
HBASE_TEST_UTILITY.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+ } catch (NamespaceExistException e) {
+ // Ignore expected exception
+ }
+ }
return HBASE_TEST_UTILITY.createTable(htable,
Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
}
@@ -202,13 +226,23 @@ public class HFileTargetIT implements Serializable {
@Test
public void testBulkLoad() throws Exception {
+ bulkLoadTest(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR);
+ }
+
+ @Test
+ public void testBulkLoadWithNamespace() throws Exception {
+ bulkLoadTest(TEST_NAMESPACE);
+ }
+
+ private void bulkLoadTest(String namespace) throws Exception {
Pipeline pipeline = new MRPipeline(HFileTargetIT.class,
HBASE_TEST_UTILITY.getConfiguration());
Path inputPath = copyResourceFileToHDFS("shakes.txt");
Path outputPath = getTempPathOnHDFS("out");
byte[] columnFamilyA = Bytes.toBytes("colfamA");
byte[] columnFamilyB = Bytes.toBytes("colfamB");
Admin admin = HBASE_TEST_UTILITY.getAdmin();
- Table testTable = createTable(26, new HColumnDescriptor(columnFamilyA),
new HColumnDescriptor(columnFamilyB));
+ Table testTable = createTable(namespace, 26, new
HColumnDescriptor(columnFamilyA),
+ new HColumnDescriptor(columnFamilyB));
Connection connection = admin.getConnection();
RegionLocator regionLocator =
connection.getRegionLocator(testTable.getName());
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath,
Writables.strings()));
@@ -250,8 +284,9 @@ public class HFileTargetIT implements Serializable {
Path outputPath2 = getTempPathOnHDFS("out2");
Admin admin = HBASE_TEST_UTILITY.getAdmin();
Connection connection = admin.getConnection();
+ // Test both default and non-default namespaces
Table table1 = createTable(26);
- Table table2 = createTable(26);
+ Table table2 = createTable(TEST_NAMESPACE, 26);
RegionLocator regionLocator1 =
connection.getRegionLocator(table1.getName());
RegionLocator regionLocator2 =
connection.getRegionLocator(table2.getName());
LoadIncrementalHFiles loader = new
LoadIncrementalHFiles(HBASE_TEST_UTILITY.getConfiguration());
diff --git
a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java
b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java
index fa500bf..0b44994 100644
---
a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java
+++
b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java
@@ -40,7 +40,7 @@ import org.junit.Test;
public class RegionLocationTableTest {
- private static final String TABLE_NAME = "DATA_TABLE";
+ private static final String TABLE_NAME = "namespace:DATA_TABLE";
private RegionLocationTable regionLocationTable;
@Before
diff --git
a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 9911397..253ae7a 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -532,7 +532,7 @@ public final class HFileUtils {
table.getName().getNameAsString(),
regionLocator.getAllRegionLocations());
Path regionLocationFilePath = new Path(((DistributedPipeline)
cells.getPipeline()).createTempPath(),
- "regionLocations" + table.getName().getNameAsString());
+ "regionLocations_" + table.getName().getNameAsString().replace(":",
"_"));
writeRegionLocationTable(cells.getPipeline().getConfiguration(),
regionLocationFilePath, regionLocationTable);
for (HColumnDescriptor f : families) {