This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7ffc72b [GOBBLIN-954] Added support to swap different
HiveRegistrationPublishers
7ffc72b is described below
commit 7ffc72bcd0a11d1ac8ab8c23d93645ff441cbd5f
Author: vbohra <[email protected]>
AuthorDate: Mon Nov 11 10:08:02 2019 -0800
[GOBBLIN-954] Added support to swap different HiveRegistrationPublishers
Closes #2803 from vikrambohra/fileSignal
---
.../gobblin/configuration/ConfigurationKeys.java | 85 ++++++++++-------
.../gobblin/publisher/BaseDataPublisher.java | 1 +
.../publisher/HiveRegistrationPublisher.java | 57 ++++++++----
.../java/org/apache/gobblin/hive/HiveRegister.java | 102 +++++++++++++--------
...iveRegTaskStateCollectorServiceHandlerImpl.java | 24 ++++-
5 files changed, 171 insertions(+), 98 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index cd5b21f..bc810a6 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -65,7 +65,8 @@ public class ConfigurationKeys {
// File system URI for file-system-based task store
public static final String STATE_STORE_FS_URI_KEY = "state.store.fs.uri";
// Thread pool size for listing dataset state store
- public static final String THREADPOOL_SIZE_OF_LISTING_FS_DATASET_STATESTORE
= "state.store.threadpoolSizeOfListingFsDatasetStateStore";
+ public static final String THREADPOOL_SIZE_OF_LISTING_FS_DATASET_STATESTORE =
+ "state.store.threadpoolSizeOfListingFsDatasetStateStore";
public static final int
DEFAULT_THREADPOOL_SIZE_OF_LISTING_FS_DATASET_STATESTORE = 10;
// Enable / disable state store
public static final String STATE_STORE_ENABLED = "state.store.enabled";
@@ -207,7 +208,6 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false;
public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY =
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
-
public static final String QUEUED_TASK_TIME_MAX_SIZE =
"taskexecutor.queued_task_time.history.max_size";
public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
public static final String QUEUED_TASK_TIME_MAX_AGE =
"taskexecutor.queued_task_time.history.max_age";
@@ -305,7 +305,6 @@ public class ConfigurationKeys {
public static final String EXTRACT_ID_TIME_ZONE =
"extract.extractIdTimeZone";
public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
-
/**
* Converter configuration properties.
*/
@@ -323,7 +322,8 @@ public class ConfigurationKeys {
public static final String CONVERTER_AVRO_EXTRACTOR_FIELD_PATH =
"converter.avro.extractor.field.path";
public static final String CONVERTER_STRING_FILTER_PATTERN =
"converter.string.filter.pattern";
public static final String CONVERTER_STRING_SPLITTER_DELIMITER =
"converter.string.splitter.delimiter";
- public static final String CONVERTER_STRING_SPLITTER_SHOULD_TRIM_RESULTS =
"converter.string.splitter.shouldITrimResults";
+ public static final String CONVERTER_STRING_SPLITTER_SHOULD_TRIM_RESULTS =
+ "converter.string.splitter.shouldITrimResults";
public static final boolean
DEFAULT_CONVERTER_STRING_SPLITTER_SHOULD_TRIM_RESULTS = false;
public static final String CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR =
"converter.csv.to.json.enclosedchar";
public static final String DEFAULT_CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR = "\0";
@@ -388,7 +388,6 @@ public class ConfigurationKeys {
public static final String SIMPLE_WRITER_PREPEND_SIZE =
"simple.writer.prepend.size";
public static final String WRITER_ADD_TASK_TIMESTAMP = WRITER_PREFIX +
".addTaskTimestamp";
-
// Internal use only - used to send metadata to publisher
public static final String WRITER_METADATA_KEY = WRITER_PREFIX +
"._internal.metadata";
public static final String WRITER_PARTITION_PATH_KEY = WRITER_PREFIX +
"._internal.partition.path";
@@ -417,7 +416,8 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_CLEAN_ERR_DIR = false;
/** Set the approximate max number of records to write in err_file for each
task. Note the actual number of records
* written may be anything from 0 to about the value set + 100. */
- public static final String ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK =
QUALITY_CHECKER_PREFIX + ".row.errFile.recordsPerTask";
+ public static final String ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK =
+ QUALITY_CHECKER_PREFIX + ".row.errFile.recordsPerTask";
public static final long DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK =
1000000;
/**
@@ -446,13 +446,15 @@ public class ConfigurationKeys {
* PUBLISH_WRITER_METADATA_MERGER_NAME_KEY: Class to use to merge
writer-generated metadata.
*/
public static final String DATA_PUBLISH_WRITER_METADATA_KEY =
DATA_PUBLISHER_PREFIX + ".metadata.publish.writer";
- public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_KEY =
DATA_PUBLISHER_PREFIX + ".metadata.publish.writer.merger.class";
+ public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_KEY =
+ DATA_PUBLISHER_PREFIX + ".metadata.publish.writer.merger.class";
/**
* Metadata configuration properties used internally
*/
- public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_DEFAULT
= "org.apache.gobblin.metadata.types.GlobalMetadataJsonMerger";
- public static final String DATA_PUBLISHER_METADATA_OUTPUT_DIR =
DATA_PUBLISHER_PREFIX + ".metadata.output.dir";
+ public static final String DATA_PUBLISH_WRITER_METADATA_MERGER_NAME_DEFAULT =
+ "org.apache.gobblin.metadata.types.GlobalMetadataJsonMerger";
+ public static final String DATA_PUBLISHER_METADATA_OUTPUT_DIR =
DATA_PUBLISHER_PREFIX + ".metadata.output.dir";
//Metadata String in the configuration file
public static final String DATA_PUBLISHER_METADATA_STR =
DATA_PUBLISHER_PREFIX + ".metadata.string";
public static final String DATA_PUBLISHER_METADATA_OUTPUT_FILE =
DATA_PUBLISHER_PREFIX + ".metadata.output_file";
@@ -467,7 +469,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_DATA_PUBLISHER_TYPE =
"org.apache.gobblin.publisher.BaseDataPublisher";
public static final String DATA_PUBLISHER_FILE_SYSTEM_URI =
DATA_PUBLISHER_PREFIX + ".fs.uri";
public static final String DATA_PUBLISHER_FINAL_DIR = DATA_PUBLISHER_PREFIX
+ ".final.dir";
- public static final String DATA_PUBLISHER_APPEND_EXTRACT_TO_FINAL_DIR =
DATA_PUBLISHER_PREFIX + ".appendExtractToFinalDir";
+ public static final String DATA_PUBLISHER_APPEND_EXTRACT_TO_FINAL_DIR =
+ DATA_PUBLISHER_PREFIX + ".appendExtractToFinalDir";
public static final boolean
DEFAULT_DATA_PUBLISHER_APPEND_EXTRACT_TO_FINAL_DIR = true;
public static final String DATA_PUBLISHER_REPLACE_FINAL_DIR =
DATA_PUBLISHER_PREFIX + ".replace.final.dir";
public static final String DATA_PUBLISHER_FINAL_NAME = DATA_PUBLISHER_PREFIX
+ ".final.name";
@@ -480,6 +483,8 @@ public class ConfigurationKeys {
public static final String PUBLISHER_DIRS = DATA_PUBLISHER_PREFIX +
".output.dirs";
public static final String DATA_PUBLISHER_CAN_BE_SKIPPED =
DATA_PUBLISHER_PREFIX + ".canBeSkipped";
public static final boolean DEFAULT_DATA_PUBLISHER_CAN_BE_SKIPPED = false;
+ public static final String PUBLISHER_LATEST_FILE_ARRIVAL_TIMESTAMP =
+ DATA_PUBLISHER_PREFIX + ".latest.file.arrival.timestamp";
/**
* Configuration properties used by the extractor.
@@ -561,7 +566,8 @@ public class ConfigurationKeys {
public static final String SOURCE_FILEBASED_OPTIONAL_DOWNLOADER_CLASS =
"source.filebased.downloader.class";
public static final String SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH =
"source.filebased.encrypted";
- public static final String SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED =
"source.filebased.fs.prior.snapshot.required";
+ public static final String SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED =
+ "source.filebased.fs.prior.snapshot.required";
public static final boolean
DEFAULT_SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED = false;
/**
@@ -604,7 +610,6 @@ public class ConfigurationKeys {
public static final String SOURCE_CONN_REFRESH_TOKEN = SOURCE_CONN_PREFIX +
"refresh.token";
public static final String SOURCE_CONN_DECRYPT_CLIENT_SECRET =
SOURCE_CONN_PREFIX + "decrypt.client.id.secret";
-
/**
* Source default configurations.
*/
@@ -647,7 +652,8 @@ public class ConfigurationKeys {
/**
* Set to true so that job still proceed if TaskStateCollectorService failed.
*/
- public static final String
JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE =
"job.proceed.onTaskStateCollectorServiceFailure";
+ public static final String
JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE =
+ "job.proceed.onTaskStateCollectorServiceFailure";
/**
* Configuration properties for email settings.
@@ -671,10 +677,12 @@ public class ConfigurationKeys {
public static final String METRICS_REPORT_INTERVAL_KEY =
METRICS_CONFIGURATIONS_PREFIX + "report.interval";
public static final String DEFAULT_METRICS_REPORT_INTERVAL =
Long.toString(TimeUnit.SECONDS.toMillis(30));
public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name";
- public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES =
METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
+ public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES =
+ METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX =
"metrics.reporting";
- public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX =
METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
+ public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX =
+ METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
// File-based reporting
public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
METRICS_CONFIGURATIONS_PREFIX + "reporting.file.enabled";
@@ -684,7 +692,7 @@ public class ConfigurationKeys {
public static final String DEFAULT_METRICS_FILE_SUFFIX = "";
public static final String FAILURE_REPORTING_FILE_ENABLED_KEY =
"failure.reporting.file.enabled";
public static final String DEFAULT_FAILURE_REPORTING_FILE_ENABLED =
Boolean.toString(true);
- public static final String FAILURE_LOG_DIR_KEY = "failure.log.dir";
+ public static final String FAILURE_LOG_DIR_KEY = "failure.log.dir";
// JMX-based reporting
public static final String METRICS_REPORTING_JMX_ENABLED_KEY =
@@ -695,11 +703,15 @@ public class ConfigurationKeys {
public static final String METRICS_REPORTING_KAFKA_ENABLED_KEY =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.enabled";
public static final String DEFAULT_METRICS_REPORTING_KAFKA_ENABLED =
Boolean.toString(false);
- public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS =
"org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
+ public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS =
+ "org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
public static final String METRICS_REPORTING_KAFKA_FORMAT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
- public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
- public static final String METRICS_REPORTING_KAFKAPUSHERKEYS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafkaPusherKeys";
- public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
+ public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
+ public static final String METRICS_REPORTING_KAFKAPUSHERKEYS =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.kafkaPusherKeys";
+ public static final String METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafkaPusherKeys";
public static final String DEFAULT_METRICS_REPORTING_KAFKA_FORMAT = "json";
public static final String METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY =
METRICS_CONFIGURATIONS_PREFIX +
"reporting.kafka.avro.use.schema.registry";
@@ -718,7 +730,8 @@ public class ConfigurationKeys {
public static final int DEFAULT_REPORTER_KEY_SIZE = 100;
public static final String METRICS_REPORTING_PUSHERKEYS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.pusherKeys";
- public static final String METRICS_REPORTING_EVENTS_PUSHERKEYS =
METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".pusherKeys";
+ public static final String METRICS_REPORTING_EVENTS_PUSHERKEYS =
+ METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".pusherKeys";
//Graphite-based reporting
public static final String METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY =
@@ -739,8 +752,8 @@ public class ConfigurationKeys {
public static final String
DEFAULT_METRICS_REPORTING_GRAPHITE_EVENTS_VALUE_AS_KEY =
Boolean.toString(false);
public static final String METRICS_REPORTING_GRAPHITE_SENDING_TYPE =
METRICS_CONFIGURATIONS_PREFIX + "reporting.graphite.sending.type";
- public static final String METRICS_REPORTING_GRAPHITE_PREFIX =
METRICS_CONFIGURATIONS_PREFIX
- + "reporting.graphite.prefix";
+ public static final String METRICS_REPORTING_GRAPHITE_PREFIX =
+ METRICS_CONFIGURATIONS_PREFIX + "reporting.graphite.prefix";
public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_PREFIX = "";
public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_SENDING_TYPE =
"TCP";
@@ -752,8 +765,7 @@ public class ConfigurationKeys {
public static final String METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED_KEY =
METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.events.enabled";
public static final String DEFAULT_METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED
= Boolean.toString(false);
- public static final String METRICS_REPORTING_INFLUXDB_URL =
- METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.url";
+ public static final String METRICS_REPORTING_INFLUXDB_URL =
METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.url";
public static final String METRICS_REPORTING_INFLUXDB_DATABASE =
METRICS_CONFIGURATIONS_PREFIX + "reporting.influxdb.database";
public static final String METRICS_REPORTING_INFLUXDB_EVENTS_DATABASE =
@@ -789,12 +801,14 @@ public class ConfigurationKeys {
public static final String DEFAULT_ADMIN_SERVER_HOST = "localhost";
public static final String ADMIN_SERVER_PORT_KEY = "admin.server.port";
public static final String DEFAULT_ADMIN_SERVER_PORT = "8000";
- public static final String
ADMIN_SERVER_HIDE_JOBS_WITHOUT_TASKS_BY_DEFAULT_KEY =
"admin.server.hide_jobs_without_tasks_by_default.enabled";
+ public static final String
ADMIN_SERVER_HIDE_JOBS_WITHOUT_TASKS_BY_DEFAULT_KEY =
+ "admin.server.hide_jobs_without_tasks_by_default.enabled";
public static final String
DEFAULT_ADMIN_SERVER_HIDE_JOBS_WITHOUT_TASKS_BY_DEFAULT = "false";
public static final String ADMIN_SERVER_REFRESH_INTERVAL_KEY =
"admin.server.refresh_interval";
public static final long DEFAULT_ADMIN_SERVER_REFRESH_INTERVAL = 30000;
- public static final String DEFAULT_ADMIN_SERVER_FACTORY_CLASS =
"org.apache.gobblin.admin.DefaultAdminWebServerFactory";
+ public static final String DEFAULT_ADMIN_SERVER_FACTORY_CLASS =
+ "org.apache.gobblin.admin.DefaultAdminWebServerFactory";
/**
* Kafka job configurations.
@@ -811,10 +825,13 @@ public class ConfigurationKeys {
/**
* Kafka schema registry
*/
- public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT =
"kafka.schema.registry.httpclient.so.timeout";
- public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT =
"kafka.schema.registry.httpclient.conn.timeout";
+ public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_SO_TIMEOUT =
+ "kafka.schema.registry.httpclient.so.timeout";
+ public static final String KAFKA_SCHEMA_REGISTRY_HTTPCLIENT_CONN_TIMEOUT =
+ "kafka.schema.registry.httpclient.conn.timeout";
public static final String KAFKA_SCHEMA_REGISTRY_RETRY_TIMES =
"kafka.schema.registry.retry.times";
- public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS =
"kafka.schema.registry.retry.interval.inMillis";
+ public static final String KAFKA_SCHEMA_REGISTRY_RETRY_INTERVAL_IN_MILLIS =
+ "kafka.schema.registry.retry.interval.inMillis";
/**
* Job execution info server and history store configuration properties.
@@ -855,7 +872,6 @@ public class ConfigurationKeys {
public static final String DEFAULT_FS_PROXY_AUTH_METHOD = TOKEN_AUTH;
public static final String KERBEROS_REALM = "kerberos.realm";
-
/**
* Azkaban properties.
*/
@@ -874,6 +890,9 @@ public class ConfigurationKeys {
* Hive registration properties
*/
public static final String HIVE_REGISTRATION_POLICY =
"hive.registration.policy";
+ public static final String HIVE_REG_PUBLISHER_CLASS =
"hive.reg.publisher.class";
+ public static final String DEFAULT_HIVE_REG_PUBLISHER_CLASS =
+ "org.apache.gobblin.publisher.HiveRegistrationPublisher";
/**
* Config store properties
@@ -890,7 +909,6 @@ public class ConfigurationKeys {
public static final String TEST_HARNESS_LAUNCHER_IMPL =
"gobblin.testharness.launcher.impl";
public static final int PERMISSION_PARSING_RADIX = 8;
-
/**
* Configuration properties related to Flows
*/
@@ -967,5 +985,6 @@ public class ConfigurationKeys {
* Configuration related to avro schema check strategy
*/
public static final String AVRO_SCHEMA_CHECK_STRATEGY =
"avro.schema.check.strategy";
- public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT =
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
+ public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT =
+ "org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 56b326e..7e74b9e 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -290,6 +290,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
for (Path path : this.publisherOutputDirs) {
this.state.appendToSetProp(ConfigurationKeys.PUBLISHER_DIRS,
path.toString());
}
+
this.state.setProp(ConfigurationKeys.PUBLISHER_LATEST_FILE_ARRIVAL_TIMESTAMP,
System.currentTimeMillis());
} finally {
this.closer.close();
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
index 0ed6f18..abb6aa0 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.publisher;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -36,22 +37,22 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.hive.HiveRegProps;
import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.ExecutorsUtils;
-import lombok.extern.slf4j.Slf4j;
-
/**
* A {@link DataPublisher} that registers the already published data with Hive.
@@ -97,20 +98,28 @@ public class HiveRegistrationPublisher extends
DataPublisher {
private static Set<String> pathsToRegisterFromSingleState =
Sets.newHashSet();
/**
+ * This collection represents all specs that were sent for Hive Registration
+ * This is collected right before calling {@link
HiveRegister#register(HiveSpec)}
+ */
+ protected static final Collection<HiveSpec> allRegisteredPartitions = new
ArrayList<>();
+
+ /**
* @param state This is a Job State
*/
public HiveRegistrationPublisher(State state) {
super(state);
this.hiveRegister = this.closer.register(HiveRegister.get(state));
- this.hivePolicyExecutor =
ExecutorsUtils.loggingDecorator(Executors.newFixedThreadPool(new
HiveRegProps(state).getNumThreads(),
- ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("HivePolicyExecutor-%d"))));
+ this.hivePolicyExecutor = ExecutorsUtils.loggingDecorator(Executors
+ .newFixedThreadPool(new HiveRegProps(state).getNumThreads(),
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("HivePolicyExecutor-%d"))));
this.metricContext = Instrumented.getMetricContext(state,
HiveRegistrationPublisher.class);
isPathDedupeEnabled = state.getPropAsBoolean(PATH_DEDUPE_ENABLED,
this.DEFAULT_PATH_DEDUPE_ENABLED);
}
@Override
- public void close() throws IOException {
+ public void close()
+ throws IOException {
try {
ExecutorsUtils.shutdownExecutorService(this.hivePolicyExecutor,
Optional.of(log));
} finally {
@@ -118,7 +127,8 @@ public class HiveRegistrationPublisher extends
DataPublisher {
}
}
- protected int computeSpecs(Collection<? extends WorkUnitState> states,
CompletionService<Collection<HiveSpec>> completionService) {
+ protected int computeSpecs(Collection<? extends WorkUnitState> states,
+ CompletionService<Collection<HiveSpec>> completionService) {
// Each state in states is task-level State, while superState is the
Job-level State.
// Using both State objects to distinguish each HiveRegistrationPolicy so
that
// they can carry task-level information to pass into Hive Partition and
its corresponding Hive Table.
@@ -126,24 +136,24 @@ public class HiveRegistrationPublisher extends
DataPublisher {
// Here all runtime task-level props are injected into superstate which
installed in each Policy Object.
// runtime.props are comma-separated props collected in runtime.
int toRegisterPathCount = 0;
- for (State state:states) {
+ for (State state : states) {
State taskSpecificState = state;
if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) {
// Upstream data attribute is specified, need to inject these info
into superState as runtimeTableProps.
if
(this.hiveRegister.getProps().getUpstreamDataAttrName().isPresent()) {
- for (String attrName:
-
LIST_SPLITTER_COMMA.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())){
+ for (String attrName : LIST_SPLITTER_COMMA
+
.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())) {
if (state.contains(attrName)) {
-
taskSpecificState.appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS,
- attrName + ":" + state.getProp(attrName));
+ taskSpecificState
+ .appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS, attrName
+ ":" + state.getProp(attrName));
}
}
}
final HiveRegistrationPolicy policy =
HiveRegistrationPolicyBase.getPolicy(taskSpecificState);
- for ( final String path :
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) {
- if (isPathDedupeEnabled){
+ for (final String path :
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS)) {
+ if (isPathDedupeEnabled) {
if (pathsToRegisterFromSingleState.contains(path)) {
continue;
} else {
@@ -153,7 +163,8 @@ public class HiveRegistrationPublisher extends
DataPublisher {
toRegisterPathCount += 1;
completionService.submit(new Callable<Collection<HiveSpec>>() {
@Override
- public Collection<HiveSpec> call() throws Exception {
+ public Collection<HiveSpec> call()
+ throws Exception {
try (Timer.Context context =
metricContext.timer(HIVE_SPEC_COMPUTATION_TIMER).time()) {
return policy.getHiveSpecs(new Path(path));
}
@@ -164,15 +175,19 @@ public class HiveRegistrationPublisher extends
DataPublisher {
}
return toRegisterPathCount;
}
+
@Deprecated
@Override
- public void initialize() throws IOException {}
+ public void initialize()
+ throws IOException {
+ }
/**
* @param states This is a collection of TaskState.
*/
@Override
- public void publishData(Collection<? extends WorkUnitState> states) throws
IOException {
+ public void publishData(Collection<? extends WorkUnitState> states)
+ throws IOException {
CompletionService<Collection<HiveSpec>> completionService =
new ExecutorCompletionService<>(this.hivePolicyExecutor);
@@ -180,6 +195,7 @@ public class HiveRegistrationPublisher extends
DataPublisher {
for (int i = 0; i < toRegisterPathCount; i++) {
try {
for (HiveSpec spec : completionService.take().get()) {
+ allRegisteredPartitions.add(spec);
this.hiveRegister.register(spec);
}
} catch (InterruptedException | ExecutionException e) {
@@ -191,13 +207,14 @@ public class HiveRegistrationPublisher extends
DataPublisher {
}
@Override
- public void publishMetadata(Collection<? extends WorkUnitState> states)
throws IOException {
+ public void publishMetadata(Collection<? extends WorkUnitState> states)
+ throws IOException {
// Nothing to do
}
private static void addRuntimeHiveRegistrationProperties(State state) {
// Use seconds instead of milliseconds to be consistent with other times
stored in hive
- state.appendToListProp(HiveRegProps.HIVE_TABLE_PARTITION_PROPS,
- String.format("%s:%d", DATA_PUBLISH_TIME,
TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)));
+ state.appendToListProp(HiveRegProps.HIVE_TABLE_PARTITION_PROPS,
String.format("%s:%d", DATA_PUBLISH_TIME,
+ TimeUnit.SECONDS.convert(System.currentTimeMillis(),
TimeUnit.MILLISECONDS)));
}
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index f481522..dc0b65c 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -17,12 +17,6 @@
package org.apache.gobblin.hive;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
@@ -32,9 +26,19 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.HiveRegistrationUnit.Column;
@@ -57,7 +61,8 @@ import
org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
public abstract class HiveRegister implements Closeable {
public static final String HIVE_REGISTER_TYPE = "hive.register.type";
- public static final String DEFAULT_HIVE_REGISTER_TYPE =
"org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister";
+ public static final String DEFAULT_HIVE_REGISTER_TYPE =
+ "org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister";
public static final String HIVE_TABLE_COMPARATOR_TYPE =
"hive.table.comparator.type";
public static final String DEFAULT_HIVE_TABLE_COMPARATOR_TYPE =
HiveTableComparator.class.getName();
public static final String HIVE_PARTITION_COMPARATOR_TYPE =
"hive.partition.comparator.type";
@@ -76,8 +81,8 @@ public abstract class HiveRegister implements Closeable {
protected HiveRegister(State state) {
this.props = new HiveRegProps(state);
this.hiveDbRootDir = this.props.getDbRootDir();
- this.executor = ExecutorsUtils.loggingDecorator(
- ScalingThreadPoolExecutor.newScalingThreadPool(0,
this.props.getNumThreads(), TimeUnit.SECONDS.toMillis(10),
+ this.executor = ExecutorsUtils.loggingDecorator(ScalingThreadPoolExecutor
+ .newScalingThreadPool(0, this.props.getNumThreads(),
TimeUnit.SECONDS.toMillis(10),
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of(getClass().getSimpleName()))));
}
@@ -93,7 +98,8 @@ public abstract class HiveRegister implements Closeable {
ListenableFuture<Void> future = this.executor.submit(new Callable<Void>() {
@Override
- public Void call() throws Exception {
+ public Void call()
+ throws Exception {
try {
if (spec instanceof HiveSpecWithPredicates &&
!evaluatePredicates((HiveSpecWithPredicates) spec)) {
log.info("Skipping " + spec + " since predicates return false");
@@ -152,7 +158,8 @@ public abstract class HiveRegister implements Closeable {
* the {@link HiveSpec}, since these are done in {@link
#register(HiveSpec)}.
* </p>
*/
- protected abstract void registerPath(HiveSpec spec) throws IOException;
+ protected abstract void registerPath(HiveSpec spec)
+ throws IOException;
/**
* Create a Hive database if not exists.
@@ -161,7 +168,8 @@ public abstract class HiveRegister implements Closeable {
* @return true if the db is successfully created; false if the db already
exists.
* @throws IOException
*/
- public abstract boolean createDbIfNotExists(String dbName) throws
IOException;
+ public abstract boolean createDbIfNotExists(String dbName)
+ throws IOException;
/**
* Create a Hive table if not exists.
@@ -170,7 +178,8 @@ public abstract class HiveRegister implements Closeable {
* @return true if the table is successfully created; false if the table
already exists.
* @throws IOException
*/
- public abstract boolean createTableIfNotExists(HiveTable table) throws
IOException;
+ public abstract boolean createTableIfNotExists(HiveTable table)
+ throws IOException;
/**
* Add a Hive partition to a table if not exists.
@@ -180,7 +189,8 @@ public abstract class HiveRegister implements Closeable {
* @return true if the partition is successfully added; false if the
partition already exists.
* @throws IOException
*/
- public abstract boolean addPartitionIfNotExists(HiveTable table,
HivePartition partition) throws IOException;
+ public abstract boolean addPartitionIfNotExists(HiveTable table,
HivePartition partition)
+ throws IOException;
/**
* Determines whether a Hive table exists.
@@ -190,7 +200,8 @@ public abstract class HiveRegister implements Closeable {
* @return true if the table exists, false otherwise.
* @throws IOException
*/
- public abstract boolean existsTable(String dbName, String tableName) throws
IOException;
+ public abstract boolean existsTable(String dbName, String tableName)
+ throws IOException;
/**
* Determines whether a Hive partition exists.
@@ -203,7 +214,8 @@ public abstract class HiveRegister implements Closeable {
* @throws IOException
*/
public abstract boolean existsPartition(String dbName, String tableName,
List<Column> partitionKeys,
- List<String> partitionValues) throws IOException;
+ List<String> partitionValues)
+ throws IOException;
/**
* Drop a table if exists.
@@ -212,7 +224,8 @@ public abstract class HiveRegister implements Closeable {
* @param tableName the table name
* @throws IOException
*/
- public abstract void dropTableIfExists(String dbName, String tableName)
throws IOException;
+ public abstract void dropTableIfExists(String dbName, String tableName)
+ throws IOException;
/**
* Drop a partition if exists.
@@ -224,7 +237,8 @@ public abstract class HiveRegister implements Closeable {
* @throws IOException
*/
public abstract void dropPartitionIfExists(String dbName, String tableName,
List<Column> partitionKeys,
- List<String> partitionValues) throws IOException;
+ List<String> partitionValues)
+ throws IOException;
/**
* Get a {@link HiveTable} using the given db name and table name.
@@ -234,7 +248,8 @@ public abstract class HiveRegister implements Closeable {
* @return an {@link Optional} of {@link HiveTable} if the table exists,
otherwise {@link Optional#absent()}.
* @throws IOException
*/
- public abstract Optional<HiveTable> getTable(String dbName, String
tableName) throws IOException;
+ public abstract Optional<HiveTable> getTable(String dbName, String tableName)
+ throws IOException;
/**
* Get a {@link HivePartition} using the given db name, table name,
partition keys and partition values.
@@ -247,7 +262,8 @@ public abstract class HiveRegister implements Closeable {
* @throws IOException
*/
public abstract Optional<HivePartition> getPartition(String dbName, String
tableName, List<Column> partitionKeys,
- List<String> partitionValues) throws IOException;
+ List<String> partitionValues)
+ throws IOException;
/**
* Alter the given {@link HiveTable}. An Exception should be thrown if the
table does not exist.
@@ -255,7 +271,8 @@ public abstract class HiveRegister implements Closeable {
* @param table a {@link HiveTable} to which the existing table should be
updated.
* @throws IOException
*/
- public abstract void alterTable(HiveTable table) throws IOException;
+ public abstract void alterTable(HiveTable table)
+ throws IOException;
/**
* Alter the given {@link HivePartition}. An Exception should be thrown if
the partition does not exist.
@@ -264,7 +281,8 @@ public abstract class HiveRegister implements Closeable {
* @param partition a {@link HivePartition} to which the existing partition
should be updated.
* @throws IOException
*/
- public abstract void alterPartition(HiveTable table, HivePartition
partition) throws IOException;
+ public abstract void alterPartition(HiveTable table, HivePartition partition)
+ throws IOException;
/**
* Create a table if not exists, or alter a table if exists.
@@ -272,7 +290,8 @@ public abstract class HiveRegister implements Closeable {
* @param table a {@link HiveTable} to be created or altered
* @throws IOException
*/
- public void createOrAlterTable(HiveTable table) throws IOException {
+ public void createOrAlterTable(HiveTable table)
+ throws IOException {
if (!createTableIfNotExists(table)) {
alterTable(table);
}
@@ -285,7 +304,8 @@ public abstract class HiveRegister implements Closeable {
* @param partition a {@link HivePartition} to which the existing partition
should be updated.
* @throws IOException
*/
- public void addOrAlterPartition(HiveTable table, HivePartition partition)
throws IOException {
+ public void addOrAlterPartition(HiveTable table, HivePartition partition)
+ throws IOException {
if (!addPartitionIfNotExists(table, partition)) {
alterPartition(table, partition);
}
@@ -311,8 +331,8 @@ public abstract class HiveRegister implements Closeable {
try {
Class<?> clazz =
Class.forName(this.props.getProp(HIVE_PARTITION_COMPARATOR_TYPE,
DEFAULT_HIVE_PARTITION_COMPARATOR_TYPE));
- return (HiveRegistrationUnitComparator<?>)
ConstructorUtils.invokeConstructor(clazz, existingPartition,
- newPartition);
+ return (HiveRegistrationUnitComparator<?>) ConstructorUtils
+ .invokeConstructor(clazz, existingPartition, newPartition);
} catch (ReflectiveOperationException e) {
log.error("Unable to instantiate Hive partition comparator", e);
throw Throwables.propagate(e);
@@ -329,22 +349,26 @@ public abstract class HiveRegister implements Closeable {
* @throws IOException if any registration failed or was interrupted.
*/
@Override
- public void close() throws IOException {
+ public void close()
+ throws IOException {
try {
- for (Map.Entry<String, Future<Void>> entry : this.futures.entrySet()) {
- try {
- entry.getValue().get();
- } catch (ExecutionException ee) {
- throw new IOException("Failed to finish registration for " +
entry.getKey(), ee.getCause());
- }
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
+ waitOnFuturesToFinish();
} finally {
ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log));
}
}
+ public void waitOnFuturesToFinish()
+ throws IOException {
+ for (Map.Entry<String, Future<Void>> entry : this.futures.entrySet()) {
+ try {
+ entry.getValue().get();
+ } catch (InterruptedException | ExecutionException ee) {
+ throw new IOException("Failed to finish registration for " +
entry.getKey(), ee.getCause());
+ }
+ }
+ }
+
/**
* Get an instance of {@link HiveRegister}.
*
@@ -353,8 +377,7 @@ public abstract class HiveRegister implements Closeable {
* will be returned. This {@link State} object is also used to instantiate
the {@link HiveRegister} object.
*/
public static HiveRegister get(State props) {
- Optional<String> metastoreUri =
-
Optional.fromNullable(props.getProperties().getProperty(HIVE_METASTORE_URI_KEY));
+ Optional<String> metastoreUri =
Optional.fromNullable(props.getProperties().getProperty(HIVE_METASTORE_URI_KEY));
return get(props, metastoreUri);
}
@@ -383,5 +406,4 @@ public abstract class HiveRegister implements Closeable {
throw Throwables.propagate(e);
}
}
-
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
index 32ad1b9..51ff8cb 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
@@ -20,11 +20,12 @@ package org.apache.gobblin.runtime;
import java.io.IOException;
import java.util.Collection;
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.publisher.HiveRegistrationPublisher;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import lombok.extern.slf4j.Slf4j;
/**
* A {@link TaskStateCollectorServiceHandler} implementation that execute hive
registration on driver level.
@@ -36,19 +37,32 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HiveRegTaskStateCollectorServiceHandlerImpl implements
TaskStateCollectorServiceHandler {
+ private static final String TASK_COLLECTOR_SERVICE_PREFIX =
"task.collector.service";
+ private static final String HIVE_REG_PUBLISHER_CLASS =
"hive.reg.publisher.class";
+ private static final String HIVE_REG_PUBLISHER_CLASS_KEY =
TASK_COLLECTOR_SERVICE_PREFIX + "." + HIVE_REG_PUBLISHER_CLASS;
+ private static final String DEFAULT_HIVE_REG_PUBLISHER_CLASS =
+ "org.apache.gobblin.publisher.HiveRegistrationPublisher";
private HiveRegistrationPublisher hiveRegHandler;
public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState) {
- hiveRegHandler = new HiveRegistrationPublisher(jobState);
+ String className = jobState
+ .getProp(HIVE_REG_PUBLISHER_CLASS_KEY,
DEFAULT_HIVE_REG_PUBLISHER_CLASS);
+ try {
+ hiveRegHandler = (HiveRegistrationPublisher)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className),
jobState);
+ }catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Could not instantiate
HiveRegistrationPublisher " + className, e);
+ }
}
@Override
- public void handle(Collection<? extends WorkUnitState> taskStates) throws
IOException {
+ public void handle(Collection<? extends WorkUnitState> taskStates)
+ throws IOException {
this.hiveRegHandler.publishData(taskStates);
}
@Override
- public void close() throws IOException {
+ public void close()
+ throws IOException {
hiveRegHandler.close();
}
}