This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e49c068  [GOBBLIN-1263] Dataset specific Database name for registration
e49c068 is described below

commit e49c068c5f3c9876ff5e89bfd16f3274d0335d00
Author: Lei Sun <[email protected]>
AuthorDate: Fri Sep 11 16:28:05 2020 -0700

    [GOBBLIN-1263] Dataset specific Database name for registration
    
    Dataset specific Database name for registration
    
    move the getListOfValuesFromConfigStore to utils
    class
    
    Closes #3104 from autumnust/datasetLevelDBReg
---
 .../hive/policy/HiveRegistrationPolicyBase.java    | 15 ++++++---
 .../policy/HiveRegistrationPolicyBaseTest.java     | 39 ++++++++++++++++++++--
 .../extractor/extract/kafka/ConfigStoreUtils.java  |  7 ++++
 3 files changed, 54 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index ca4d35b..a0270ca 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -34,7 +35,6 @@ import org.apache.hadoop.hive.metastore.TableType;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
@@ -165,6 +165,8 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
    * Obtain Hive database names. The returned {@link Iterable} contains the 
database name returned by
    * {@link #getDatabaseName(Path)} (if present) plus additional database 
names specified in
    * {@link #ADDITIONAL_HIVE_DATABASE_NAMES}.
+   * Note that the dataset-specific configuration will overwrite job 
configuration for the value of
+   * {@link #ADDITIONAL_HIVE_DATABASE_NAMES}
    *
    */
   protected Iterable<String> getDatabaseNames(Path path) {
@@ -175,7 +177,11 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
       databaseNames.add(databaseName.get());
     }
 
-    if 
(!Strings.isNullOrEmpty(this.props.getProp(ADDITIONAL_HIVE_DATABASE_NAMES))) {
+    if (configForTopic.isPresent() && 
configForTopic.get().hasPath(ADDITIONAL_HIVE_DATABASE_NAMES)) {
+      databaseNames.addAll(
+          
ConfigStoreUtils.getListOfValuesFromConfigStore(configForTopic.get(), 
ADDITIONAL_HIVE_DATABASE_NAMES).stream()
+          .map(x -> this.dbNamePrefix + x + 
this.dbNameSuffix).collect(Collectors.toList()));
+    } else if 
(!Strings.isNullOrEmpty(this.props.getProp(ADDITIONAL_HIVE_DATABASE_NAMES))) {
       for (String additionalDbName : 
this.props.getPropAsList(ADDITIONAL_HIVE_DATABASE_NAMES)) {
         databaseNames.add(this.dbNamePrefix + additionalDbName + 
this.dbNameSuffix);
       }
@@ -255,9 +261,8 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
 
     // Searching additional table name from ConfigStore-returned object.
     if (primaryTableName.isPresent() && configForTopic.isPresent() && 
configForTopic.get().hasPath(additionalNamesProp)) {
-      for (String additionalTableName : Splitter.on(",")
-          .trimResults()
-          .splitToList(configForTopic.get().getString(additionalNamesProp))) {
+      for (String additionalTableName : ConfigStoreUtils
+          .getListOfValuesFromConfigStore(configForTopic.get(), 
additionalNamesProp)) {
         String resolvedTableName =
             StringUtils.replace(additionalTableName, PRIMARY_TABLE_TOKEN, 
primaryTableName.get());
         tableNames.add(this.tableNamePrefix + resolvedTableName + 
this.tableNameSuffix);
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
index d1f3812..b236755 100644
--- 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.gobblin.hive.policy;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
@@ -27,10 +29,16 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.spec.SimpleHiveSpec;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES;
+
 
 /**
  * Unit test for {@link HiveRegistrationPolicyBase}
@@ -46,7 +54,7 @@ public class HiveRegistrationPolicyBaseTest {
       throws IOException {
     State state = new State();
     state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
"db1");
-    
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
 "db2");
+    state.appendToListProp(ADDITIONAL_HIVE_DATABASE_NAMES, "db2");
 
     state.appendToListProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME, "tbl1");
     
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES, 
"tbl2,tbl3");
@@ -71,12 +79,39 @@ public class HiveRegistrationPolicyBaseTest {
     examine(spec, "db2", "tbl3");
   }
 
+  // Testing fetching additional hive databases from config object. 
Specifically, we verifies if dataset-level DB
+  // config could overwrite the same configuration set in the job level.
+  public void testGetDatabasesNames() throws Exception {
+    State jobState = new State();
+    jobState.setProp(ADDITIONAL_HIVE_DATABASE_NAMES, "db1");
+
+    Properties properties = new Properties();
+    properties.setProperty(ADDITIONAL_HIVE_DATABASE_NAMES, "db2");
+    Config configObj = ConfigUtils.propertiesToConfig(properties);
+    HiveRegistrationPolicyBase policyBase = new 
HiveRegistrationPolicyBase(jobState);
+    // Setting the config object manually.
+    policyBase.configForTopic = Optional.fromNullable(configObj);
+
+    // Construct a random Path
+    File dir = Files.createTempDir();
+    dir.deleteOnExit();
+    Path dummyPath = new Path(dir.getAbsolutePath() + " /random");
+
+    int dbCount = 0 ;
+    for (String dbName : policyBase.getDatabaseNames(dummyPath)) {
+      Assert.assertEquals(dbName, "db2");
+      dbCount += 1;
+    }
+
+    Assert.assertEquals(dbCount, 1);
+  }
+
   @Test
   public void testGetHiveSpecsWithDBFilter()
       throws IOException {
     State state = new State();
     state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
"db1");
-    
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
 "db2");
+    state.appendToListProp(ADDITIONAL_HIVE_DATABASE_NAMES, "db2");
 
     state.appendToListProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME, "tbl1");
     
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES, 
"tbl2,tbl3,$PRIMARY_TABLE_col");
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
index a4b0c4f..8768110 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
@@ -257,4 +258,10 @@ public class ConfigStoreUtils {
       log.warn("None of the blacklist or whitelist tags are provided");
     }
   }
+
+  public static List<String> getListOfValuesFromConfigStore(Config config, 
String keyValue) {
+    return Splitter.on(",")
+        .trimResults()
+        .splitToList(config.getString(keyValue));
+  }
 }

Reply via email to