This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be97acf  [GOBBLIN-852] Reorganize the code for hive registration to 
isolate function
be97acf is described below

commit be97acfd596b47edaa0e613d15f0ca267e525fc5
Author: Zihan Li <[email protected]>
AuthorDate: Tue Aug 13 10:05:59 2019 -0700

    [GOBBLIN-852] Reorganize the code for hive registration to isolate function
    
    Closes #2708 from ZihanLi58/ETL-8815
---
 .../publisher/HiveRegistrationPublisher.java       | 50 ++++++++++++----------
 1 file changed, 28 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
index 17370b7..0ed6f18 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
@@ -76,9 +76,9 @@ public class HiveRegistrationPublisher extends DataPublisher {
   private static final boolean DEFAULT_PATH_DEDUPE_ENABLED = true;
 
   private final Closer closer = Closer.create();
-  private final HiveRegister hiveRegister;
-  private final ExecutorService hivePolicyExecutor;
-  private final MetricContext metricContext;
+  protected final HiveRegister hiveRegister;
+  protected final ExecutorService hivePolicyExecutor;
+  protected final MetricContext metricContext;
 
   /**
    * The configuration to determine if path deduplication should be enabled 
during Hive Registration process.
@@ -88,7 +88,7 @@ public class HiveRegistrationPublisher extends DataPublisher {
    *
    * e.g. In streaming mode, there could be cases that files(e.g. avro) under 
single topic folder carry different schema.
    */
-  private boolean isPathDedupeEnabled;
+  protected boolean isPathDedupeEnabled;
 
   /**
    * Make the deduplication of path to be registered in the Publisher level,
@@ -118,25 +118,14 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
     }
   }
 
-  @Deprecated
-  @Override
-  public void initialize() throws IOException {}
-
-  /**
-   * @param states This is a collection of TaskState.
-   */
-  @Override
-  public void publishData(Collection<? extends WorkUnitState> states) throws 
IOException {
-    CompletionService<Collection<HiveSpec>> completionService =
-        new ExecutorCompletionService<>(this.hivePolicyExecutor);
-
+  protected int computeSpecs(Collection<? extends WorkUnitState> states, 
CompletionService<Collection<HiveSpec>> completionService) {
     // Each state in states is task-level State, while superState is the 
Job-level State.
     // Using both State objects to distinguish each HiveRegistrationPolicy so 
that
     // they can carry task-level information to pass into Hive Partition and 
its corresponding Hive Table.
 
     // Here all runtime task-level props are injected into superstate which 
installed in each Policy Object.
     // runtime.props are comma-separated props collected in runtime.
-    int toRegisterPathCount = 0 ;
+    int toRegisterPathCount = 0;
     for (State state:states) {
       State taskSpecificState = state;
       if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) {
@@ -147,17 +136,20 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
               
LIST_SPLITTER_COMMA.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())){
             if (state.contains(attrName)) {
               
taskSpecificState.appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS,
-                    attrName + ":" + state.getProp(attrName));
+                  attrName + ":" + state.getProp(attrName));
             }
           }
         }
 
         final HiveRegistrationPolicy policy = 
HiveRegistrationPolicyBase.getPolicy(taskSpecificState);
         for ( final String path : 
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) {
-          if (isPathDedupeEnabled && 
pathsToRegisterFromSingleState.contains(path)){
-            continue;
+          if (isPathDedupeEnabled){
+            if (pathsToRegisterFromSingleState.contains(path)) {
+              continue;
+            } else {
+              pathsToRegisterFromSingleState.add(path);
+            }
           }
-          pathsToRegisterFromSingleState.add(path);
           toRegisterPathCount += 1;
           completionService.submit(new Callable<Collection<HiveSpec>>() {
             @Override
@@ -169,8 +161,22 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
           });
         }
       }
-      else continue;
     }
+    return toRegisterPathCount;
+  }
+  @Deprecated
+  @Override
+  public void initialize() throws IOException {}
+
+  /**
+   * @param states This is a collection of TaskState.
+   */
+  @Override
+  public void publishData(Collection<? extends WorkUnitState> states) throws 
IOException {
+    CompletionService<Collection<HiveSpec>> completionService =
+        new ExecutorCompletionService<>(this.hivePolicyExecutor);
+
+    int toRegisterPathCount = computeSpecs(states, completionService);
     for (int i = 0; i < toRegisterPathCount; i++) {
       try {
         for (HiveSpec spec : completionService.take().get()) {

Reply via email to