This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4902e [HUDI-437] Support user-defined index (#1408)
0a4902e is described below
commit 0a4902eccece1df959946fcb7379a94fc5fe0784
Author: leesf <[email protected]>
AuthorDate: Wed Mar 18 10:27:40 2020 +0800
[HUDI-437] Support user-defined index (#1408)
* [hotfix] set default value for index class config
* class config takes precedence over `hoodie.index.type`
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 9 +++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../java/org/apache/hudi/index/HoodieIndex.java | 10 +++
.../org/apache/hudi/index/TestHoodieIndex.java | 84 ++++++++++++++++++++++
4 files changed, 107 insertions(+)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index db83498..00c7605 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -37,6 +37,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String INDEX_TYPE_PROP = "hoodie.index.type";
public static final String DEFAULT_INDEX_TYPE =
HoodieIndex.IndexType.BLOOM.name();
+ public static final String INDEX_CLASS_PROP = "hoodie.index.class";
+ public static final String DEFAULT_INDEX_CLASS = "";
+
// ***** Bloom Index configs *****
public static final String BLOOM_FILTER_NUM_ENTRIES =
"hoodie.index.bloom.num_entries";
public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000";
@@ -117,6 +120,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig
{
return this;
}
+ public Builder withIndexClass(String indexClass) {
+ props.setProperty(INDEX_CLASS_PROP, indexClass);
+ return this;
+ }
+
public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig
hBaseIndexConfig) {
props.putAll(hBaseIndexConfig.getProps());
return this;
@@ -195,6 +203,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
+ setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP),
INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS);
setDefaultOnCondition(props,
!props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES,
DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP),
BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f88d96a..04c1dfd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -309,6 +309,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return
HoodieIndex.IndexType.valueOf(props.getProperty(HoodieIndexConfig.INDEX_TYPE_PROP));
}
+ public String getIndexClass() {
+ return props.getProperty(HoodieIndexConfig.INDEX_CLASS_PROP);
+ }
+
public int getBloomFilterNumEntries() {
return
Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES));
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
index b18cf45..8305a7b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
@@ -24,6 +24,8 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
@@ -51,6 +53,14 @@ public abstract class HoodieIndex<T extends
HoodieRecordPayload> implements Seri
public static <T extends HoodieRecordPayload> HoodieIndex<T>
createIndex(HoodieWriteConfig config,
JavaSparkContext jsc) throws HoodieIndexException {
+ // first use index class config to create index.
+ if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
+ Object instance = ReflectionUtils.loadClass(config.getIndexClass(),
config);
+ if (!(instance instanceof HoodieIndex)) {
+ throw new HoodieIndexException(config.getIndexClass() + " is not a
subclass of HoodieIndex");
+ }
+ return (HoodieIndex) instance;
+ }
switch (config.getIndexType()) {
case HBASE:
return new HBaseIndex<>(config);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 91435f8..46239db 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -18,20 +18,33 @@
package org.apache.hudi.index;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestHarness;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
import org.apache.hudi.index.hbase.HBaseIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TestHoodieIndex extends HoodieClientTestHarness {
@@ -67,5 +80,76 @@ public class TestHoodieIndex extends HoodieClientTestHarness
{
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof
HoodieGlobalBloomIndex);
+
+ config = clientConfigBuilder.withPath(basePath)
+
.withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build();
+ assertTrue(HoodieIndex.createIndex(config, jsc) instanceof
DummyHoodieIndex);
+
+ config = clientConfigBuilder.withPath(basePath)
+
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build();
+ try {
+ HoodieIndex.createIndex(config, jsc);
+ fail("exception is expected");
+ } catch (HoodieIndexException e) {
+ assertTrue(e.getMessage().contains("is not a subclass of HoodieIndex"));
+ }
+
+ config = clientConfigBuilder.withPath(basePath)
+
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithoutConstructor.class.getName()).build()).build();
+ try {
+ HoodieIndex.createIndex(config, jsc);
+ fail("exception is expected");
+ } catch (HoodieException e) {
+ assertTrue(e.getMessage().contains("Unable to instantiate class"));
+ }
+ }
+
+ public static class DummyHoodieIndex<T extends HoodieRecordPayload> extends
HoodieIndex<T> {
+ public DummyHoodieIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public JavaPairRDD<HoodieKey, Option<Pair<String, String>>>
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>>
recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) throws
HoodieIndexException {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus>
writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) throws
HoodieIndexException {
+ return null;
+ }
+
+ @Override
+ public boolean rollbackCommit(String commitTime) {
+ return false;
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return false;
+ }
+
+ @Override
+ public boolean canIndexLogFiles() {
+ return false;
+ }
+
+ @Override
+ public boolean isImplicitWithStorage() {
+ return false;
+ }
+ }
+
+ public static class IndexWithConstructor {
+ public IndexWithConstructor(HoodieWriteConfig config) {}
+ }
+
+ public static class IndexWithoutConstructor {
+
}
}