This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5f8bf97 [HUDI-671] Added unit-test for HBaseIndex (#1381)
5f8bf97 is described below
commit 5f8bf970058d5915af1910dbf917a33356d06af2
Author: Prashant Wason <[email protected]>
AuthorDate: Sat Mar 7 16:48:43 2020 -0800
[HUDI-671] Added unit-test for HBaseIndex (#1381)
---
.../java/org/apache/hudi/index/TestHbaseIndex.java | 133 ++++++++++++++++++++-
1 file changed, 129 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 2893947..53adf6c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -22,9 +22,11 @@ import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -38,6 +40,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@@ -57,6 +60,7 @@ import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.mockito.Mockito;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -65,6 +69,7 @@ import scala.Tuple2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.times;
@@ -93,7 +98,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
@BeforeClass
public static void init() throws Exception {
// Initialize HbaseMiniCluster
- utility = new HBaseTestingUtility();
+ hbaseConfig = HBaseConfiguration.create();
+ hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test");
+
+ utility = new HBaseTestingUtility(hbaseConfig);
utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration();
utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
@@ -389,6 +397,117 @@ public class TestHbaseIndex extends
HoodieClientTestHarness {
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(),
100), 0.0f);
}
+ @Test
+ public void testSmallBatchSize() throws Exception {
+ String newCommitTime = "001";
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ // Load to memory
+ HoodieWriteConfig config = getConfig(2);
+ HBaseIndex index = new HBaseIndex(config);
+ try (HoodieWriteClient writeClient = getWriteClient(config);) {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
+
+ // Test tagLocation without any entries in index
+ JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc,
hoodieTable);
+ assert (javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 0);
+
+ // Insert 200 records
+ writeClient.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords,
newCommitTime);
+ assertNoWriteErrors(writeStatues.collect());
+
+ // Now tagLocation for these records, hbaseIndex should not tag them
since it was a failed
+ // commit
+ javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assert (javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 0);
+
+ // Now commit this & update location of records inserted and validate no
errors
+ writeClient.commit(newCommitTime, writeStatues);
+ // Now tagLocation for these records, hbaseIndex should tag them
correctly
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+ javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(200, javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size());
+ assertEquals(200, javaRDD.map(record ->
record.getKey().getRecordKey()).distinct().count());
+ assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation()
!= null
+ &&
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+ }
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ String newCommitTime = "001";
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ // Load to memory
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config);
+ try (HoodieWriteClient writeClient = getWriteClient(config);) {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
+
+ // Test tagLocation without any entries in index
+ JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc,
hoodieTable);
+ assert (javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 0);
+
+ // Insert records
+ writeClient.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords,
newCommitTime);
+ assertNoWriteErrors(writeStatues.collect());
+ writeClient.commit(newCommitTime, writeStatues);
+
+ // Now tagLocation for these records, hbaseIndex should tag them
correctly
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+ javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(10, javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size());
+ assertEquals(10, javaRDD.map(record ->
record.getKey().getRecordKey()).distinct().count());
+ assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation()
!= null
+ &&
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+
+ // Delete all records. This has to be done directly as deleting index
entries
+ // is not implemented via HoodieWriteClient
+ Option recordMetadata = Option.empty();
+ JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
+ WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
+ w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new
HoodieRecord(r.getKey(), null), recordMetadata));
+ assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
+ newWriteStatus.setStat(new HoodieWriteStat());
+ return newWriteStatus;
+ });
+ JavaRDD<WriteStatus> deleteStatus =
index.updateLocation(deleteWriteStatues, jsc, hoodieTable);
+ assertEquals(deleteStatus.count(), deleteWriteStatues.count());
+ assertNoWriteErrors(deleteStatus.collect());
+
+ // Ensure no records can be tagged
+ javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+ assertEquals(0, javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size());
+ assertEquals(10, javaRDD.map(record ->
record.getKey().getRecordKey()).distinct().count());
+ assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() !=
null
+ &&
record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+ }
+ }
+
+ @Test
+ public void testFeatureSupport() throws Exception {
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config);
+
+ assertTrue(index.canIndexLogFiles());
+ try {
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
+ index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(),
1), jsc, hoodieTable);
+ fail("HbaseIndex supports fetchRecordLocation");
+ } catch (UnsupportedOperationException ex) {
+ // Expected so ignore
+ ex.getStackTrace();
+ }
+ }
+
private WriteStatus getSampleWriteStatus(final int numInserts, final int
numUpdateWrites) {
final WriteStatus writeStatus = new WriteStatus(false, 0.1);
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
@@ -406,10 +525,14 @@ public class TestHbaseIndex extends
HoodieClientTestHarness {
}
private HoodieWriteConfig getConfig() {
- return getConfigBuilder().build();
+ return getConfigBuilder(100).build();
+ }
+
+ private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
+ return getConfigBuilder(hbaseIndexBatchSize).build();
}
- private HoodieWriteConfig.Builder getConfigBuilder() {
+ private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024)
@@ -419,8 +542,10 @@ public class TestHbaseIndex extends
HoodieClientTestHarness {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
+ .hbaseIndexPutBatchSizeAutoCompute(true)
+ .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent",
""))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
- .hbaseIndexGetBatchSize(100).build())
+ .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
.build());
}
}