This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e49c068 [GOBBLIN-1263] Dataset specific Database name for registration
e49c068 is described below
commit e49c068c5f3c9876ff5e89bfd16f3274d0335d00
Author: Lei Sun <[email protected]>
AuthorDate: Fri Sep 11 16:28:05 2020 -0700
[GOBBLIN-1263] Dataset specific Database name for registration
Dataset specific Database name for registration
move the getListOfValuesFromConfigStore to utils
class
Closes #3104 from autumnust/datasetLevelDBReg
---
.../hive/policy/HiveRegistrationPolicyBase.java | 15 ++++++---
.../policy/HiveRegistrationPolicyBaseTest.java | 39 ++++++++++++++++++++--
.../extractor/extract/kafka/ConfigStoreUtils.java | 7 ++++
3 files changed, 54 insertions(+), 7 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index ca4d35b..a0270ca 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -34,7 +35,6 @@ import org.apache.hadoop.hive.metastore.TableType;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
@@ -165,6 +165,8 @@ public class HiveRegistrationPolicyBase implements
HiveRegistrationPolicy {
* Obtain Hive database names. The returned {@link Iterable} contains the
database name returned by
* {@link #getDatabaseName(Path)} (if present) plus additional database
names specified in
* {@link #ADDITIONAL_HIVE_DATABASE_NAMES}.
+ * Note that the dataset-specific configuration will overwrite job
configuration for the value of
+ * {@link #ADDITIONAL_HIVE_DATABASE_NAMES}
*
*/
protected Iterable<String> getDatabaseNames(Path path) {
@@ -175,7 +177,11 @@ public class HiveRegistrationPolicyBase implements
HiveRegistrationPolicy {
databaseNames.add(databaseName.get());
}
- if
(!Strings.isNullOrEmpty(this.props.getProp(ADDITIONAL_HIVE_DATABASE_NAMES))) {
+ if (configForTopic.isPresent() &&
configForTopic.get().hasPath(ADDITIONAL_HIVE_DATABASE_NAMES)) {
+ databaseNames.addAll(
+
ConfigStoreUtils.getListOfValuesFromConfigStore(configForTopic.get(),
ADDITIONAL_HIVE_DATABASE_NAMES).stream()
+ .map(x -> this.dbNamePrefix + x +
this.dbNameSuffix).collect(Collectors.toList()));
+ } else if
(!Strings.isNullOrEmpty(this.props.getProp(ADDITIONAL_HIVE_DATABASE_NAMES))) {
for (String additionalDbName :
this.props.getPropAsList(ADDITIONAL_HIVE_DATABASE_NAMES)) {
databaseNames.add(this.dbNamePrefix + additionalDbName +
this.dbNameSuffix);
}
@@ -255,9 +261,8 @@ public class HiveRegistrationPolicyBase implements
HiveRegistrationPolicy {
// Searching additional table name from ConfigStore-returned object.
if (primaryTableName.isPresent() && configForTopic.isPresent() &&
configForTopic.get().hasPath(additionalNamesProp)) {
- for (String additionalTableName : Splitter.on(",")
- .trimResults()
- .splitToList(configForTopic.get().getString(additionalNamesProp))) {
+ for (String additionalTableName : ConfigStoreUtils
+ .getListOfValuesFromConfigStore(configForTopic.get(),
additionalNamesProp)) {
String resolvedTableName =
StringUtils.replace(additionalTableName, PRIMARY_TABLE_TOKEN,
primaryTableName.get());
tableNames.add(this.tableNamePrefix + resolvedTableName +
this.tableNameSuffix);
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
index d1f3812..b236755 100644
---
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
@@ -17,9 +17,11 @@
package org.apache.gobblin.hive.policy;
+import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
@@ -27,10 +29,16 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.spec.SimpleHiveSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES;
+
/**
* Unit test for {@link HiveRegistrationPolicyBase}
@@ -46,7 +54,7 @@ public class HiveRegistrationPolicyBaseTest {
throws IOException {
State state = new State();
state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
"db1");
-
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
"db2");
+ state.appendToListProp(ADDITIONAL_HIVE_DATABASE_NAMES, "db2");
state.appendToListProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME, "tbl1");
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES,
"tbl2,tbl3");
@@ -71,12 +79,39 @@ public class HiveRegistrationPolicyBaseTest {
examine(spec, "db2", "tbl3");
}
+ // Testing fetching additional hive databases from config object.
Specifically, we verifies if dataset-level DB
+ // config could overwrite the same configuration set in the job level.
+ public void testGetDatabasesNames() throws Exception {
+ State jobState = new State();
+ jobState.setProp(ADDITIONAL_HIVE_DATABASE_NAMES, "db1");
+
+ Properties properties = new Properties();
+ properties.setProperty(ADDITIONAL_HIVE_DATABASE_NAMES, "db2");
+ Config configObj = ConfigUtils.propertiesToConfig(properties);
+ HiveRegistrationPolicyBase policyBase = new
HiveRegistrationPolicyBase(jobState);
+ // Setting the config object manually.
+ policyBase.configForTopic = Optional.fromNullable(configObj);
+
+ // Construct a random Path
+ File dir = Files.createTempDir();
+ dir.deleteOnExit();
+ Path dummyPath = new Path(dir.getAbsolutePath() + " /random");
+
+ int dbCount = 0 ;
+ for (String dbName : policyBase.getDatabaseNames(dummyPath)) {
+ Assert.assertEquals(dbName, "db2");
+ dbCount += 1;
+ }
+
+ Assert.assertEquals(dbCount, 1);
+ }
+
@Test
public void testGetHiveSpecsWithDBFilter()
throws IOException {
State state = new State();
state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
"db1");
-
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
"db2");
+ state.appendToListProp(ADDITIONAL_HIVE_DATABASE_NAMES, "db2");
state.appendToListProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME, "tbl1");
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES,
"tbl2,tbl3,$PRIMARY_TABLE_col");
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
index a4b0c4f..8768110 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -257,4 +258,10 @@ public class ConfigStoreUtils {
log.warn("None of the blacklist or whitelist tags are provided");
}
}
+
+ public static List<String> getListOfValuesFromConfigStore(Config config,
String keyValue) {
+ return Splitter.on(",")
+ .trimResults()
+ .splitToList(config.getString(keyValue));
+ }
}