This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 423990a  [GOBBLIN-753] Refactor HiveRegistrationPolicyBase to surface 
configStore object
423990a is described below

commit 423990a0a1cfb29f3e6b1e74c8f5fb35efd1690d
Author: autumnust <[email protected]>
AuthorDate: Wed Apr 24 14:28:34 2019 -0700

    [GOBBLIN-753] Refactor HiveRegistrationPolicyBase to surface configStore 
object
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    Some refactoring in `HiveRegistrationPolicyBase`
    to make topic-specific configStore object
    available in extension class
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
        -
    https://issues.apache.org/jira/browse/GOBBLIN-753
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Refactor configstore object in
    HiveRegistrationPolicyBase to make it available in
    extending class
    
    Refactor more
    
    Closes #2618 from
    autumnust/timeCutOnConfigStoreTableName
---
 .../java/org/apache/gobblin/hive/HiveRegister.java | 20 +++++------
 .../hive/policy/HiveRegistrationPolicyBase.java    | 41 ++++++++++++++--------
 2 files changed, 34 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index 08a4f10..93dffe2 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.hive;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
@@ -26,17 +32,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HiveRegistrationUnit.Column;
@@ -47,8 +45,6 @@ import org.apache.gobblin.hive.spec.HiveSpecWithPredicates;
 import org.apache.gobblin.hive.spec.activity.Activity;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
 
 
 /**
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index 1a4ab1d..ee1eb03 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -88,6 +88,8 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
   protected static final ConfigClient configClient =
       
org.apache.gobblin.config.client.ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
 
+  protected Optional<Config> configForTopic = Optional.<Config>absent();
+
   /**
    * A valid db or table name should start with an alphanumeric character, and 
contains only
    * alphanumeric characters and '_'.
@@ -132,6 +134,13 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
     this.tableNameSuffix = props.getProp(HIVE_TABLE_NAME_SUFFIX, 
StringUtils.EMPTY);
     this.emptyInputPathFlag = 
props.getPropAsBoolean(MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, false);
     this.metricContext = Instrumented.getMetricContext(props, 
HiveRegister.class);
+
+    // Get Topic-specific config object doesn't require any runtime-set 
properties in prop object, safe to initialize
+    // in constructor.
+    Timer.Context context = 
this.metricContext.timer(CONFIG_FOR_TOPIC_TIMER).time();
+    configForTopic =
+        ConfigStoreUtils.getConfigForTopic(this.props.getProperties(), 
KafkaSource.TOPIC_NAME, this.configClient);
+    context.close();
   }
 
   /**
@@ -175,8 +184,8 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
   }
 
   /**
-   * This method first tries to obtain the database name from {@link 
#HIVE_TABLE_NAME}.
-   * If this property is not specified, it then tries to obtain the database 
name using
+   * This method first tries to obtain the table name from {@link 
#HIVE_TABLE_NAME}.
+   * If this property is not specified, it then tries to obtain the table name 
using
    * the first group of {@link #HIVE_TABLE_REGEX}.
    */
   protected Optional<String> getTableName(Path path) {
@@ -234,13 +243,6 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
     if ((primaryTableName = getTableName(path)).isPresent() && 
!dbPrefix.isPresent()) {
       tableNames.add(primaryTableName.get());
     }
-    Optional<Config> configForTopic = Optional.<Config>absent();
-    if (primaryTableName.isPresent()) {
-      Timer.Context context = 
this.metricContext.timer(CONFIG_FOR_TOPIC_TIMER).time();
-      configForTopic =
-          ConfigStoreUtils.getConfigForTopic(this.props.getProperties(), 
KafkaSource.TOPIC_NAME, this.configClient);
-      context.close();
-    }
 
     String additionalNamesProp;
     if (dbPrefix.isPresent()) {
@@ -249,7 +251,8 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
       additionalNamesProp = ADDITIONAL_HIVE_TABLE_NAMES;
     }
 
-    if (configForTopic.isPresent() && 
configForTopic.get().hasPath(additionalNamesProp)) {
+    // Searching additional table name from ConfigStore-returned object.
+    if (primaryTableName.isPresent() && configForTopic.isPresent() && 
configForTopic.get().hasPath(additionalNamesProp)) {
       for (String additionalTableName : Splitter.on(",")
           .trimResults()
           .splitToList(configForTopic.get().getString(additionalNamesProp))) {
@@ -357,11 +360,7 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
     }
 
     // Setting table-level props.
-    State tableProps = new State(this.props.getTablePartitionProps());
-    if (this.props.getRuntimeTableProps().isPresent()){
-      tableProps.setProp(HiveMetaStoreUtils.RUNTIME_PROPS, 
this.props.getRuntimeTableProps().get());
-    }
-    table.setProps(tableProps);
+    table.setProps(getRuntimePropsEnrichedTblProps());
 
     table.setStorageProps(this.props.getStorageProps());
     table.setSerDeProps(this.props.getSerdeProps());
@@ -371,6 +370,18 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
     return table;
   }
 
+  /**
+   * Enrich the table-level properties with properties carried over from 
ingestion runtime.
+   * Extend this class to add more runtime properties if required.
+   */
+  protected State getRuntimePropsEnrichedTblProps() {
+    State tableProps = new State(this.props.getTablePartitionProps());
+    if (this.props.getRuntimeTableProps().isPresent()){
+      tableProps.setProp(HiveMetaStoreUtils.RUNTIME_PROPS, 
this.props.getRuntimeTableProps().get());
+    }
+    return tableProps;
+  }
+
   protected Optional<HivePartition> getPartition(Path path, HiveTable table) 
throws IOException {
     return Optional.<HivePartition> absent();
   }

Reply via email to