This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ffc72b  [GOBBLIN-954] Added support to swap different 
HiveRegistrationPublishers
7ffc72b is described below

commit 7ffc72bcd0a11d1ac8ab8c23d93645ff441cbd5f
Author: vbohra <[email protected]>
AuthorDate: Mon Nov 11 10:08:02 2019 -0800

    [GOBBLIN-954] Added support to swap different HiveRegistrationPublishers
    
    Closes #2803 from vikrambohra/fileSignal
---
 .../gobblin/configuration/ConfigurationKeys.java   |  85 ++++++++++-------
 .../gobblin/publisher/BaseDataPublisher.java       |   1 +
 .../publisher/HiveRegistrationPublisher.java       |  57 ++++++++----
 .../java/org/apache/gobblin/hive/HiveRegister.java | 102 +++++++++++++--------
 ...iveRegTaskStateCollectorServiceHandlerImpl.java |  24 ++++-
 5 files changed, 171 insertions(+), 98 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index cd5b21f..bc810a6 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -65,7 +65,8 @@ public class ConfigurationKeys {
   // File system URI for file-system-based task store
   public static final String STATE_STORE_FS_URI_KEY = "state.store.fs.uri";
   // Thread pool size for listing dataset state store
-  public static final String THREADPOOL_SIZE_OF_LISTING_FS_DATASET_STATESTORE 
= "state.store.threadpoolSizeOfListingFsDatasetStateStore";
+  public static final String THREADPOOL_SIZE_OF_LISTING_FS_DATASET_STATESTORE =
+      "state.store.threadpoolSizeOfListingFsDatasetStateStore";
   public static final int 
DEFAULT_THREADPOOL_SIZE_OF_LISTING_FS_DATASET_STATESTORE = 10;
   // Enable / disable state store
   public static final String STATE_STORE_ENABLED = "state.store.enabled";
@@ -207,7 +208,6 @@ public class ConfigurationKeys {
   public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false;
   public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = 
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
 
-
   public static final String QUEUED_TASK_TIME_MAX_SIZE = 
"taskexecutor.queued_task_time.history.max_size";
   public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
   public static final String QUEUED_TASK_TIME_MAX_AGE = 
"taskexecutor.queued_task_time.history.max_age";
@@ -305,7 +305,6 @@ public class ConfigurationKeys {
   public static final String EXTRACT_ID_TIME_ZONE = 
"extract.extractIdTimeZone";
   public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
 
-
   /**
    * Converter configuration properties.
    */
@@ -323,7 +322,8 @@ public class ConfigurationKeys {
   public static final String CONVERTER_AVRO_EXTRACTOR_FIELD_PATH = 
"converter.avro.extractor.field.path";
   public static final String CONVERTER_STRING_FILTER_PATTERN = 
"converter.string.filter.pattern";
   public static final String CONVERTER_STRING_SPLITTER_DELIMITER = 
"converter.string.splitter.delimiter";
-  public static final String CONVERTER_STRING_SPLITTER_SHOULD_TRIM_RESULTS = 
"converter.string.splitter.shouldITrimResults";
+  public static final String CONVERTER_STRING_SPLITTER_SHOULD_TRIM_RESULTS =
+      "converter.string.splitter.shouldITrimResults";
   public static final boolean 
DEFAULT_CONVERTER_STRING_SPLITTER_SHOULD_TRIM_RESULTS = false;
   public static final String CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR = 
"converter.csv.to.json.enclosedchar";
   public static final String DEFAULT_CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR = "\0";
@@ -388,7 +388,6 @@ public class ConfigurationKeys {
   public static final String SIMPLE_WRITER_PREPEND_SIZE = 
"simple.writer.prepend.size";
   public static final String WRITER_ADD_TASK_TIMESTAMP = WRITER_PREFIX + 
".addTaskTimestamp";
 
-
   // Internal use only - used to send metadata to publisher
   public static final String WRITER_METADATA_KEY = WRITER_PREFIX + 
"._internal.metadata";
   public static final String WRITER_PARTITION_PATH_KEY = WRITER_PREFIX + 
"._internal.partition.path";
@@ -417,7 +416,8 @@ public class ConfigurationKeys {
   public static final boolean DEFAULT_CLEAN_ERR_DIR = false;
   /** Set the approximate max number of records to write in err_file for each 
task. Note the actual number of records
    * written may be anything from 0 to about the value set + 100. */
-  public static final String ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK = 
QUALITY_CHECKER_PREFIX + ".row.errFile.recordsPerTask";
+  public static final String ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK =
+      QUALITY_CHECKER_PREFIX + ".row.errFile.recordsPerTask";
   public static final long DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK = 
1000000;
 
   /**
@@ -446,13 +446,15 @@ public class ConfigurationKeys {
    * PUBLISH_WRITER_METADATA_MERGER_NAME_KEY: Class to use to merge 
writer-generated metadata.
    */
   public static final String DATA_PUBLISH_WRITER_METADATA_KEY = 
DATA_PUBLISHER_PREFIX + ".metadata.publish.writer";
-  public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_KEY = 
DATA_PUBLISHER_PREFIX + ".metadata.publish.writer.merger.class";
+  public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_KEY =
+      DATA_PUBLISHER_PREFIX + ".metadata.publish.writer.merger.class";
 
   /**
    * Metadata configuration properties used internally
    */
-  public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_DEFAULT 
= "org.apache.gobblin.metadata.types.GlobalMetadataJsonMerger";
-  public static final String DATA_PUBLISHER_METADATA_OUTPUT_DIR =  
DATA_PUBLISHER_PREFIX + ".metadata.output.dir";
+  public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_DEFAULT =
+      "org.apache.gobblin.metadata.types.GlobalMetadataJsonMerger";
+  public static final String DATA_PUBLISHER_METADATA_OUTPUT_DIR = 
DATA_PUBLISHER_PREFIX + ".metadata.output.dir";
   //Metadata String in the configuration file
   public static final String DATA_PUBLISHER_METADATA_STR = 
DATA_PUBLISHER_PREFIX + ".metadata.string";
   public static final String DATA_PUBLISHER_METADATA_OUTPUT_FILE = 
DATA_PUBLISHER_PREFIX + ".metadata.output_file";
@@ -467,7 +469,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_DATA_PUBLISHER_TYPE = 
"org.apache.gobblin.publisher.BaseDataPublisher";
   public static final String DATA_PUBLISHER_FILE_SYSTEM_URI = 
DATA_PUBLISHER_PREFIX + ".fs.uri";
   public static final String DATA_PUBLISHER_FINAL_DIR = DATA_PUBLISHER_PREFIX 
+ ".final.dir";
-  public static final String DATA_PUBLISHER_APPEND_EXTRACT_TO_FINAL_DIR = 
DATA_PUBLISHER_PREFIX + ".appendExtractToFinalDir";
+  public static final String DATA_PUBLISHER_APPEND_EXTRACT_TO_FINAL_DIR =
+      DATA_PUBLISHER_PREFIX + ".appendExtractToFinalDir";
   public static final boolean 
DEFAULT_DATA_PUBLISHER_APPEND_EXTRACT_TO_FINAL_DIR = true;
   public static final String DATA_PUBLISHER_REPLACE_FINAL_DIR = 
DATA_PUBLISHER_PREFIX + ".replace.final.dir";
   public static final String DATA_PUBLISHER_FINAL_NAME = DATA_PUBLISHER_PREFIX 
+ ".final.name";
@@ -480,6 +483,8 @@ public class ConfigurationKeys {
   public static final String PUBLISHER_DIRS = DATA_PUBLISHER_PREFIX + 
".output.dirs";
   public static final String DATA_PUBLISHER_CAN_BE_SKIPPED = 
DATA_PUBLISHER_PREFIX + ".canBeSkipped";
   public static final boolean DEFAULT_DATA_PUBLISHER_CAN_BE_SKIPPED = false;
+  public static final String PUBLISHER_LATEST_FILE_ARRIVAL_TIMESTAMP =
+      DATA_PUBLISHER_PREFIX + ".latest.file.arrival.timestamp";
 
   /**
    * Configuration properties used by the extractor.
@@ -561,7 +566,8 @@ public class ConfigurationKeys {
   public static final String SOURCE_FILEBASED_OPTIONAL_DOWNLOADER_CLASS = 
"source.filebased.downloader.class";
   public static final String SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH = 
"source.filebased.encrypted";
 
-  public static final String SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED = 
"source.filebased.fs.prior.snapshot.required";
+  public static final String SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED =
+      "source.filebased.fs.prior.snapshot.required";
   public static final boolean 
DEFAULT_SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED = false;
 
   /**
@@ -604,7 +610,6 @@ public class ConfigurationKeys {
   public static final String SOURCE_CONN_REFRESH_TOKEN = SOURCE_CONN_PREFIX + 
"refresh.token";
   public static final String SOURCE_CONN_DECRYPT_CLIENT_SECRET = 
SOURCE_CONN_PREFIX + "decrypt.client.id.secret";
 
-
   /**
    * Source default configurations.
    */
@@ -647,7 +652,8 @@ public class ConfigurationKeys {
   /**
    * Set to true so that job still proceed if TaskStateCollectorService failed.
    */
-  public static final String 
JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE = 
"job.proceed.onTaskStateCollectorServiceFailure";
+  public static final String 
JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE =
+      "job.proceed.onTaskStateCollectorServiceFailure";
 
   /**
    * Configuration properties for email settings.
@@ -671,10 +677,12 @@ public class ConfigurationKeys {
   public static final String METRICS_REPORT_INTERVAL_KEY = 
METRICS_CONFIGURATIONS_PREFIX + "report.interval";
   public static final String DEFAULT_METRICS_REPORT_INTERVAL = 
Long.toString(TimeUnit.SECONDS.toMillis(30));
   public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name";
-  public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 
METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
+  public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES =
+      METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
   public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
   public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX = 
"metrics.reporting";
-  public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX = 
METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
+  public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX =
+      METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
   // File-based reporting
   public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.file.enabled";
@@ -684,7 +692,7 @@ public class ConfigurationKeys {
   public static final String DEFAULT_METRICS_FILE_SUFFIX = "";
   public static final String FAILURE_REPORTING_FILE_ENABLED_KEY = 
"failure.reporting.file.enabled";
   public static final String DEFAULT_FAILURE_REPORTING_FILE_ENABLED = 
Boolean.toString(true);
-  public static final String FAILURE_LOG_DIR_KEY =  "failure.log.dir";
+  public static final String FAILURE_LOG_DIR_KEY = "failure.log.dir";
 
   // JMX-based reporting
   public static final String METRICS_REPORTING_JMX_ENABLED_KEY =
@@ -695,11 +703,15 @@ public class ConfigurationKeys {
   public static final String METRICS_REPORTING_KAFKA_ENABLED_KEY =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.enabled";
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_ENABLED = 
Boolean.toString(false);
-  public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS = 
"org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
+  public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS =
+      "org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
   public static final String METRICS_REPORTING_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
-  public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
-  public static final String METRICS_REPORTING_KAFKAPUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafkaPusherKeys";
-  public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
+  public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
+  public static final String METRICS_REPORTING_KAFKAPUSHERKEYS =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.kafkaPusherKeys";
+  public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
   public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
       METRICS_CONFIGURATIONS_PREFIX + 
"reporting.kafka.avro.use.schema.registry";
@@ -718,7 +730,8 @@ public class ConfigurationKeys {
   public static final int DEFAULT_REPORTER_KEY_SIZE = 100;
 
   public static final String METRICS_REPORTING_PUSHERKEYS = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.pusherKeys";
-  public static final String METRICS_REPORTING_EVENTS_PUSHERKEYS = 
METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".pusherKeys";
+  public static final String METRICS_REPORTING_EVENTS_PUSHERKEYS =
+      METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".pusherKeys";
 
   //Graphite-based reporting
   public static final String METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY =
@@ -739,8 +752,8 @@ public class ConfigurationKeys {
   public static final String 
DEFAULT_METRICS_REPORTING_GRAPHITE_EVENTS_VALUE_AS_KEY = 
Boolean.toString(false);
   public static final String METRICS_REPORTING_GRAPHITE_SENDING_TYPE =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.graphite.sending.type";
-  public static final String METRICS_REPORTING_GRAPHITE_PREFIX = 
METRICS_CONFIGURATIONS_PREFIX
-          + "reporting.graphite.prefix";
+  public static final String METRICS_REPORTING_GRAPHITE_PREFIX =
+      METRICS_CONFIGURATIONS_PREFIX + "reporting.graphite.prefix";
   public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_PREFIX = "";
 
   public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_SENDING_TYPE = 
"TCP";
@@ -752,8 +765,7 @@ public class ConfigurationKeys {
   public static final String METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED_KEY =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.events.enabled";
   public static final String DEFAULT_METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED 
= Boolean.toString(false);
-  public static final String METRICS_REPORTING_INFLUXDB_URL =
-      METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.url";
+  public static final String METRICS_REPORTING_INFLUXDB_URL = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.url";
   public static final String METRICS_REPORTING_INFLUXDB_DATABASE =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.database";
   public static final String METRICS_REPORTING_INFLUXDB_EVENTS_DATABASE =
@@ -789,12 +801,14 @@ public class ConfigurationKeys {
   public static final String DEFAULT_ADMIN_SERVER_HOST = "localhost";
   public static final String ADMIN_SERVER_PORT_KEY = "admin.server.port";
   public static final String DEFAULT_ADMIN_SERVER_PORT = "8000";
-  public static final String 
ADMIN_SERVER_HIDE_JOBS_WITHOUT_TASKS_BY_DEFAULT_KEY = 
"admin.server.hide_jobs_without_tasks_by_default.enabled";
+  public static final String 
ADMIN_SERVER_HIDE_JOBS_WITHOUT_TASKS_BY_DEFAULT_KEY =
+      "admin.server.hide_jobs_without_tasks_by_default.enabled";
   public static final String 
DEFAULT_ADMIN_SERVER_HIDE_JOBS_WITHOUT_TASKS_BY_DEFAULT = "false";
   public static final String ADMIN_SERVER_REFRESH_INTERVAL_KEY = 
"admin.server.refresh_interval";
   public static final long DEFAULT_ADMIN_SERVER_REFRESH_INTERVAL = 30000;
 
-  public static final String DEFAULT_ADMIN_SERVER_FACTORY_CLASS = 
"org.apache.gobblin.admin.DefaultAdminWebServerFactory";
+  public static final String DEFAULT_ADMIN_SERVER_FACTORY_CLASS =
+      "org.apache.gobblin.admin.DefaultAdminWebServerFactory";
 
   /**
    * Kafka job configurations.
@@ -811,10 +825,13 @@ public class ConfigurationKeys {
   /**
    * Kafka schema registry
    */
-  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT = 
"kafka.schema.registry.httpclient.so.timeout";
-  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT = 
"kafka.schema.registry.httpclient.conn.timeout";
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT =
+      "kafka.schema.registry.httpclient.so.timeout";
+  public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT =
+      "kafka.schema.registry.httpclient.conn.timeout";
   public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES = 
"kafka.schema.registry.retry.times";
-  public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS = 
"kafka.schema.registry.retry.interval.inMillis";
+  public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS =
+      "kafka.schema.registry.retry.interval.inMillis";
 
   /**
    * Job execution info server and history store configuration properties.
@@ -855,7 +872,6 @@ public class ConfigurationKeys {
   public static final String DEFAULT_FS_PROXY_AUTH_METHOD = TOKEN_AUTH;
   public static final String KERBEROS_REALM = "kerberos.realm";
 
-
   /**
    * Azkaban properties.
    */
@@ -874,6 +890,9 @@ public class ConfigurationKeys {
    * Hive registration properties
    */
   public static final String HIVE_REGISTRATION_POLICY = 
"hive.registration.policy";
+  public static final String HIVE_REG_PUBLISHER_CLASS = 
"hive.reg.publisher.class";
+  public static final String DEFAULT_HIVE_REG_PUBLISHER_CLASS =
+      "org.apache.gobblin.publisher.HiveRegistrationPublisher";
 
   /**
    * Config store properties
@@ -890,7 +909,6 @@ public class ConfigurationKeys {
   public static final String TEST_HARNESS_LAUNCHER_IMPL = 
"gobblin.testharness.launcher.impl";
   public static final int PERMISSION_PARSING_RADIX = 8;
 
-
   /**
    * Configuration properties related to Flows
    */
@@ -967,5 +985,6 @@ public class ConfigurationKeys {
    * Configuration related to avro schema check strategy
    */
   public static final String AVRO_SCHEMA_CHECK_STRATEGY = 
"avro.schema.check.strategy";
-  public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT = 
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
+  public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT =
+      "org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 56b326e..7e74b9e 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -290,6 +290,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
       for (Path path : this.publisherOutputDirs) {
         this.state.appendToSetProp(ConfigurationKeys.PUBLISHER_DIRS, 
path.toString());
       }
+      
this.state.setProp(ConfigurationKeys.PUBLISHER_LATEST_FILE_ARRIVAL_TIMESTAMP, 
System.currentTimeMillis());
     } finally {
       this.closer.close();
     }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
index 0ed6f18..abb6aa0 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.publisher;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -36,22 +37,22 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.hive.HiveRegProps;
 import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.ExecutorsUtils;
 
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * A {@link DataPublisher} that registers the already published data with Hive.
@@ -97,20 +98,28 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
   private static Set<String> pathsToRegisterFromSingleState = 
Sets.newHashSet();
 
   /**
+   * This collection represents all specs that were sent for Hive Registration
+   * This is collected right before calling {@link 
HiveRegister#register(HiveSpec)}
+   */
+  protected static final Collection<HiveSpec> allRegisteredPartitions = new 
ArrayList<>();
+
+  /**
    * @param state This is a Job State
    */
   public HiveRegistrationPublisher(State state) {
     super(state);
     this.hiveRegister = this.closer.register(HiveRegister.get(state));
-    this.hivePolicyExecutor = 
ExecutorsUtils.loggingDecorator(Executors.newFixedThreadPool(new 
HiveRegProps(state).getNumThreads(),
-        ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("HivePolicyExecutor-%d"))));
+    this.hivePolicyExecutor = ExecutorsUtils.loggingDecorator(Executors
+        .newFixedThreadPool(new HiveRegProps(state).getNumThreads(),
+            ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("HivePolicyExecutor-%d"))));
     this.metricContext = Instrumented.getMetricContext(state, 
HiveRegistrationPublisher.class);
 
     isPathDedupeEnabled = state.getPropAsBoolean(PATH_DEDUPE_ENABLED, 
this.DEFAULT_PATH_DEDUPE_ENABLED);
   }
 
   @Override
-  public void close() throws IOException {
+  public void close()
+      throws IOException {
     try {
       ExecutorsUtils.shutdownExecutorService(this.hivePolicyExecutor, 
Optional.of(log));
     } finally {
@@ -118,7 +127,8 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
     }
   }
 
-  protected int computeSpecs(Collection<? extends WorkUnitState> states, 
CompletionService<Collection<HiveSpec>> completionService) {
+  protected int computeSpecs(Collection<? extends WorkUnitState> states,
+      CompletionService<Collection<HiveSpec>> completionService) {
     // Each state in states is task-level State, while superState is the 
Job-level State.
     // Using both State objects to distinguish each HiveRegistrationPolicy so 
that
     // they can carry task-level information to pass into Hive Partition and 
its corresponding Hive Table.
@@ -126,24 +136,24 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
     // Here all runtime task-level props are injected into superstate which 
installed in each Policy Object.
     // runtime.props are comma-separated props collected in runtime.
     int toRegisterPathCount = 0;
-    for (State state:states) {
+    for (State state : states) {
       State taskSpecificState = state;
       if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) {
 
         // Upstream data attribute is specified, need to inject these info 
into superState as runtimeTableProps.
         if 
(this.hiveRegister.getProps().getUpstreamDataAttrName().isPresent()) {
-          for (String attrName:
-              
LIST_SPLITTER_COMMA.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())){
+          for (String attrName : LIST_SPLITTER_COMMA
+              
.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())) {
             if (state.contains(attrName)) {
-              
taskSpecificState.appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS,
-                  attrName + ":" + state.getProp(attrName));
+              taskSpecificState
+                  .appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS, attrName 
+ ":" + state.getProp(attrName));
             }
           }
         }
 
         final HiveRegistrationPolicy policy = 
HiveRegistrationPolicyBase.getPolicy(taskSpecificState);
-        for ( final String path : 
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) {
-          if (isPathDedupeEnabled){
+        for (final String path : 
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS)) {
+          if (isPathDedupeEnabled) {
             if (pathsToRegisterFromSingleState.contains(path)) {
               continue;
             } else {
@@ -153,7 +163,8 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
           toRegisterPathCount += 1;
           completionService.submit(new Callable<Collection<HiveSpec>>() {
             @Override
-            public Collection<HiveSpec> call() throws Exception {
+            public Collection<HiveSpec> call()
+                throws Exception {
               try (Timer.Context context = 
metricContext.timer(HIVE_SPEC_COMPUTATION_TIMER).time()) {
                 return policy.getHiveSpecs(new Path(path));
               }
@@ -164,15 +175,19 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
     }
     return toRegisterPathCount;
   }
+
   @Deprecated
   @Override
-  public void initialize() throws IOException {}
+  public void initialize()
+      throws IOException {
+  }
 
   /**
    * @param states This is a collection of TaskState.
    */
   @Override
-  public void publishData(Collection<? extends WorkUnitState> states) throws 
IOException {
+  public void publishData(Collection<? extends WorkUnitState> states)
+      throws IOException {
     CompletionService<Collection<HiveSpec>> completionService =
         new ExecutorCompletionService<>(this.hivePolicyExecutor);
 
@@ -180,6 +195,7 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
     for (int i = 0; i < toRegisterPathCount; i++) {
       try {
         for (HiveSpec spec : completionService.take().get()) {
+          allRegisteredPartitions.add(spec);
           this.hiveRegister.register(spec);
         }
       } catch (InterruptedException | ExecutionException e) {
@@ -191,13 +207,14 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
   }
 
   @Override
-  public void publishMetadata(Collection<? extends WorkUnitState> states) 
throws IOException {
+  public void publishMetadata(Collection<? extends WorkUnitState> states)
+      throws IOException {
     // Nothing to do
   }
 
   private static void addRuntimeHiveRegistrationProperties(State state) {
     // Use seconds instead of milliseconds to be consistent with other times 
stored in hive
-    state.appendToListProp(HiveRegProps.HIVE_TABLE_PARTITION_PROPS,
-        String.format("%s:%d", DATA_PUBLISH_TIME, 
TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)));
+    state.appendToListProp(HiveRegProps.HIVE_TABLE_PARTITION_PROPS, 
String.format("%s:%d", DATA_PUBLISH_TIME,
+        TimeUnit.SECONDS.convert(System.currentTimeMillis(), 
TimeUnit.MILLISECONDS)));
   }
 }
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index f481522..dc0b65c 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.hive;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
@@ -32,9 +26,19 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HiveRegistrationUnit.Column;
@@ -57,7 +61,8 @@ import 
org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
 public abstract class HiveRegister implements Closeable {
 
   public static final String HIVE_REGISTER_TYPE = "hive.register.type";
-  public static final String DEFAULT_HIVE_REGISTER_TYPE = 
"org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister";
+  public static final String DEFAULT_HIVE_REGISTER_TYPE =
+      "org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister";
   public static final String HIVE_TABLE_COMPARATOR_TYPE = 
"hive.table.comparator.type";
   public static final String DEFAULT_HIVE_TABLE_COMPARATOR_TYPE = 
HiveTableComparator.class.getName();
   public static final String HIVE_PARTITION_COMPARATOR_TYPE = 
"hive.partition.comparator.type";
@@ -76,8 +81,8 @@ public abstract class HiveRegister implements Closeable {
   protected HiveRegister(State state) {
     this.props = new HiveRegProps(state);
     this.hiveDbRootDir = this.props.getDbRootDir();
-    this.executor = ExecutorsUtils.loggingDecorator(
-        ScalingThreadPoolExecutor.newScalingThreadPool(0, 
this.props.getNumThreads(), TimeUnit.SECONDS.toMillis(10),
+    this.executor = ExecutorsUtils.loggingDecorator(ScalingThreadPoolExecutor
+        .newScalingThreadPool(0, this.props.getNumThreads(), 
TimeUnit.SECONDS.toMillis(10),
             ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of(getClass().getSimpleName()))));
   }
 
@@ -93,7 +98,8 @@ public abstract class HiveRegister implements Closeable {
     ListenableFuture<Void> future = this.executor.submit(new Callable<Void>() {
 
       @Override
-      public Void call() throws Exception {
+      public Void call()
+          throws Exception {
         try {
           if (spec instanceof HiveSpecWithPredicates && 
!evaluatePredicates((HiveSpecWithPredicates) spec)) {
             log.info("Skipping " + spec + " since predicates return false");
@@ -152,7 +158,8 @@ public abstract class HiveRegister implements Closeable {
    *   the {@link HiveSpec}, since these are done in {@link 
#register(HiveSpec)}.
    * </p>
    */
-  protected abstract void registerPath(HiveSpec spec) throws IOException;
+  protected abstract void registerPath(HiveSpec spec)
+      throws IOException;
 
   /**
    * Create a Hive database if not exists.
@@ -161,7 +168,8 @@ public abstract class HiveRegister implements Closeable {
    * @return true if the db is successfully created; false if the db already 
exists.
    * @throws IOException
    */
-  public abstract boolean createDbIfNotExists(String dbName) throws 
IOException;
+  public abstract boolean createDbIfNotExists(String dbName)
+      throws IOException;
 
   /**
    * Create a Hive table if not exists.
@@ -170,7 +178,8 @@ public abstract class HiveRegister implements Closeable {
    * @return true if the table is successfully created; false if the table 
already exists.
    * @throws IOException
    */
-  public abstract boolean createTableIfNotExists(HiveTable table) throws 
IOException;
+  public abstract boolean createTableIfNotExists(HiveTable table)
+      throws IOException;
 
   /**
    * Add a Hive partition to a table if not exists.
@@ -180,7 +189,8 @@ public abstract class HiveRegister implements Closeable {
    * @return true if the partition is successfully added; false if the 
partition already exists.
    * @throws IOException
    */
-  public abstract boolean addPartitionIfNotExists(HiveTable table, 
HivePartition partition) throws IOException;
+  public abstract boolean addPartitionIfNotExists(HiveTable table, 
HivePartition partition)
+      throws IOException;
 
   /**
    * Determines whether a Hive table exists.
@@ -190,7 +200,8 @@ public abstract class HiveRegister implements Closeable {
    * @return true if the table exists, false otherwise.
    * @throws IOException
    */
-  public abstract boolean existsTable(String dbName, String tableName) throws 
IOException;
+  public abstract boolean existsTable(String dbName, String tableName)
+      throws IOException;
 
   /**
    * Determines whether a Hive partition exists.
@@ -203,7 +214,8 @@ public abstract class HiveRegister implements Closeable {
    * @throws IOException
    */
   public abstract boolean existsPartition(String dbName, String tableName, 
List<Column> partitionKeys,
-      List<String> partitionValues) throws IOException;
+      List<String> partitionValues)
+      throws IOException;
 
   /**
    * Drop a table if exists.
@@ -212,7 +224,8 @@ public abstract class HiveRegister implements Closeable {
    * @param tableName the table name
    * @throws IOException
    */
-  public abstract void dropTableIfExists(String dbName, String tableName) 
throws IOException;
+  public abstract void dropTableIfExists(String dbName, String tableName)
+      throws IOException;
 
   /**
    * Drop a partition if exists.
@@ -224,7 +237,8 @@ public abstract class HiveRegister implements Closeable {
    * @throws IOException
    */
   public abstract void dropPartitionIfExists(String dbName, String tableName, 
List<Column> partitionKeys,
-      List<String> partitionValues) throws IOException;
+      List<String> partitionValues)
+      throws IOException;
 
   /**
    * Get a {@link HiveTable} using the given db name and table name.
@@ -234,7 +248,8 @@ public abstract class HiveRegister implements Closeable {
    * @return an {@link Optional} of {@link HiveTable} if the table exists, 
otherwise {@link Optional#absent()}.
    * @throws IOException
    */
-  public abstract Optional<HiveTable> getTable(String dbName, String 
tableName) throws IOException;
+  public abstract Optional<HiveTable> getTable(String dbName, String tableName)
+      throws IOException;
 
   /**
    * Get a {@link HivePartition} using the given db name, table name, 
partition keys and partition values.
@@ -247,7 +262,8 @@ public abstract class HiveRegister implements Closeable {
    * @throws IOException
    */
   public abstract Optional<HivePartition> getPartition(String dbName, String 
tableName, List<Column> partitionKeys,
-      List<String> partitionValues) throws IOException;
+      List<String> partitionValues)
+      throws IOException;
 
   /**
    * Alter the given {@link HiveTable}. An Exception should be thrown if the 
table does not exist.
@@ -255,7 +271,8 @@ public abstract class HiveRegister implements Closeable {
    * @param table a {@link HiveTable} to which the existing table should be 
updated.
    * @throws IOException
    */
-  public abstract void alterTable(HiveTable table) throws IOException;
+  public abstract void alterTable(HiveTable table)
+      throws IOException;
 
   /**
    * Alter the given {@link HivePartition}. An Exception should be thrown if 
the partition does not exist.
@@ -264,7 +281,8 @@ public abstract class HiveRegister implements Closeable {
    * @param partition a {@link HivePartition} to which the existing partition 
should be updated.
    * @throws IOException
    */
-  public abstract void alterPartition(HiveTable table, HivePartition 
partition) throws IOException;
+  public abstract void alterPartition(HiveTable table, HivePartition partition)
+      throws IOException;
 
   /**
    * Create a table if not exists, or alter a table if exists.
@@ -272,7 +290,8 @@ public abstract class HiveRegister implements Closeable {
    * @param table a {@link HiveTable} to be created or altered
    * @throws IOException
    */
-  public void createOrAlterTable(HiveTable table) throws IOException {
+  public void createOrAlterTable(HiveTable table)
+      throws IOException {
     if (!createTableIfNotExists(table)) {
       alterTable(table);
     }
@@ -285,7 +304,8 @@ public abstract class HiveRegister implements Closeable {
    * @param partition a {@link HivePartition} to which the existing partition 
should be updated.
    * @throws IOException
    */
-  public void addOrAlterPartition(HiveTable table, HivePartition partition) 
throws IOException {
+  public void addOrAlterPartition(HiveTable table, HivePartition partition)
+      throws IOException {
     if (!addPartitionIfNotExists(table, partition)) {
       alterPartition(table, partition);
     }
@@ -311,8 +331,8 @@ public abstract class HiveRegister implements Closeable {
     try {
       Class<?> clazz =
           Class.forName(this.props.getProp(HIVE_PARTITION_COMPARATOR_TYPE, 
DEFAULT_HIVE_PARTITION_COMPARATOR_TYPE));
-      return (HiveRegistrationUnitComparator<?>) 
ConstructorUtils.invokeConstructor(clazz, existingPartition,
-          newPartition);
+      return (HiveRegistrationUnitComparator<?>) ConstructorUtils
+          .invokeConstructor(clazz, existingPartition, newPartition);
     } catch (ReflectiveOperationException e) {
       log.error("Unable to instantiate Hive partition comparator", e);
       throw Throwables.propagate(e);
@@ -329,22 +349,26 @@ public abstract class HiveRegister implements Closeable {
    * @throws IOException if any registration failed or was interrupted.
    */
   @Override
-  public void close() throws IOException {
+  public void close()
+      throws IOException {
     try {
-      for (Map.Entry<String, Future<Void>> entry : this.futures.entrySet()) {
-        try {
-          entry.getValue().get();
-        } catch (ExecutionException ee) {
-          throw new IOException("Failed to finish registration for " + 
entry.getKey(), ee.getCause());
-        }
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
+      waitOnFuturesToFinish();
     } finally {
       ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log));
     }
   }
 
+  public void waitOnFuturesToFinish()
+      throws IOException {
+    for (Map.Entry<String, Future<Void>> entry : this.futures.entrySet()) {
+      try {
+        entry.getValue().get();
+      } catch (InterruptedException | ExecutionException ee) {
+        throw new IOException("Failed to finish registration for " + 
entry.getKey(), ee.getCause());
+      }
+    }
+  }
+
   /**
    * Get an instance of {@link HiveRegister}.
    *
@@ -353,8 +377,7 @@ public abstract class HiveRegister implements Closeable {
    * will be returned. This {@link State} object is also used to instantiate 
the {@link HiveRegister} object.
    */
   public static HiveRegister get(State props) {
-    Optional<String> metastoreUri =
-        
Optional.fromNullable(props.getProperties().getProperty(HIVE_METASTORE_URI_KEY));
+    Optional<String> metastoreUri = 
Optional.fromNullable(props.getProperties().getProperty(HIVE_METASTORE_URI_KEY));
 
     return get(props, metastoreUri);
   }
@@ -383,5 +406,4 @@ public abstract class HiveRegister implements Closeable {
       throw Throwables.propagate(e);
     }
   }
-
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
index 32ad1b9..51ff8cb 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
@@ -20,11 +20,12 @@ package org.apache.gobblin.runtime;
 import java.io.IOException;
 import java.util.Collection;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.publisher.HiveRegistrationPublisher;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * A {@link TaskStateCollectorServiceHandler} implementation that execute hive 
registration on driver level.
@@ -36,19 +37,32 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class HiveRegTaskStateCollectorServiceHandlerImpl implements 
TaskStateCollectorServiceHandler {
 
+  private static final String TASK_COLLECTOR_SERVICE_PREFIX = 
"task.collector.service";
+  private static final String HIVE_REG_PUBLISHER_CLASS = 
"hive.reg.publisher.class";
+  private static final String HIVE_REG_PUBLISHER_CLASS_KEY = 
TASK_COLLECTOR_SERVICE_PREFIX + "." + HIVE_REG_PUBLISHER_CLASS;
+  private static final String DEFAULT_HIVE_REG_PUBLISHER_CLASS =
+      "org.apache.gobblin.publisher.HiveRegistrationPublisher";
   private HiveRegistrationPublisher hiveRegHandler;
 
   public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState) {
-    hiveRegHandler = new HiveRegistrationPublisher(jobState);
+    String className = jobState
+        .getProp(HIVE_REG_PUBLISHER_CLASS_KEY, 
DEFAULT_HIVE_REG_PUBLISHER_CLASS);
+      try {
+        hiveRegHandler = (HiveRegistrationPublisher) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), 
jobState);
+      }catch (ReflectiveOperationException e) {
+        throw new RuntimeException("Could not instantiate 
HiveRegistrationPublisher " + className, e);
+      }
   }
 
   @Override
-  public void handle(Collection<? extends WorkUnitState> taskStates) throws 
IOException {
+  public void handle(Collection<? extends WorkUnitState> taskStates)
+      throws IOException {
     this.hiveRegHandler.publishData(taskStates);
   }
 
   @Override
-  public void close() throws IOException {
+  public void close()
+      throws IOException {
     hiveRegHandler.close();
   }
 }

Reply via email to