This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be97acf [GOBBLIN-852] Reorganize the code for hive registration to
isolate function
be97acf is described below
commit be97acfd596b47edaa0e613d15f0ca267e525fc5
Author: Zihan Li <[email protected]>
AuthorDate: Tue Aug 13 10:05:59 2019 -0700
[GOBBLIN-852] Reorganize the code for hive registration to isolate function
Closes #2708 from ZihanLi58/ETL-8815
---
.../publisher/HiveRegistrationPublisher.java | 50 ++++++++++++----------
1 file changed, 28 insertions(+), 22 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
index 17370b7..0ed6f18 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
@@ -76,9 +76,9 @@ public class HiveRegistrationPublisher extends DataPublisher {
private static final boolean DEFAULT_PATH_DEDUPE_ENABLED = true;
private final Closer closer = Closer.create();
- private final HiveRegister hiveRegister;
- private final ExecutorService hivePolicyExecutor;
- private final MetricContext metricContext;
+ protected final HiveRegister hiveRegister;
+ protected final ExecutorService hivePolicyExecutor;
+ protected final MetricContext metricContext;
/**
* The configuration to determine if path deduplication should be enabled
during Hive Registration process.
@@ -88,7 +88,7 @@ public class HiveRegistrationPublisher extends DataPublisher {
*
* e.g. In streaming mode, there could be cases that files(e.g. avro) under
single topic folder carry different schema.
*/
- private boolean isPathDedupeEnabled;
+ protected boolean isPathDedupeEnabled;
/**
* Make the deduplication of path to be registered in the Publisher level,
@@ -118,25 +118,14 @@ public class HiveRegistrationPublisher extends
DataPublisher {
}
}
- @Deprecated
- @Override
- public void initialize() throws IOException {}
-
- /**
- * @param states This is a collection of TaskState.
- */
- @Override
- public void publishData(Collection<? extends WorkUnitState> states) throws
IOException {
- CompletionService<Collection<HiveSpec>> completionService =
- new ExecutorCompletionService<>(this.hivePolicyExecutor);
-
+ protected int computeSpecs(Collection<? extends WorkUnitState> states,
CompletionService<Collection<HiveSpec>> completionService) {
// Each state in states is task-level State, while superState is the
Job-level State.
// Using both State objects to distinguish each HiveRegistrationPolicy so
that
// they can carry task-level information to pass into Hive Partition and
its corresponding Hive Table.
// Here all runtime task-level props are injected into superstate which
installed in each Policy Object.
// runtime.props are comma-separated props collected in runtime.
- int toRegisterPathCount = 0 ;
+ int toRegisterPathCount = 0;
for (State state:states) {
State taskSpecificState = state;
if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) {
@@ -147,17 +136,20 @@ public class HiveRegistrationPublisher extends
DataPublisher {
LIST_SPLITTER_COMMA.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())){
if (state.contains(attrName)) {
taskSpecificState.appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS,
- attrName + ":" + state.getProp(attrName));
+ attrName + ":" + state.getProp(attrName));
}
}
}
final HiveRegistrationPolicy policy =
HiveRegistrationPolicyBase.getPolicy(taskSpecificState);
for ( final String path :
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) {
- if (isPathDedupeEnabled &&
pathsToRegisterFromSingleState.contains(path)){
- continue;
+ if (isPathDedupeEnabled){
+ if (pathsToRegisterFromSingleState.contains(path)) {
+ continue;
+ } else {
+ pathsToRegisterFromSingleState.add(path);
+ }
}
- pathsToRegisterFromSingleState.add(path);
toRegisterPathCount += 1;
completionService.submit(new Callable<Collection<HiveSpec>>() {
@Override
@@ -169,8 +161,22 @@ public class HiveRegistrationPublisher extends
DataPublisher {
});
}
}
- else continue;
}
+ return toRegisterPathCount;
+ }
+ @Deprecated
+ @Override
+ public void initialize() throws IOException {}
+
+ /**
+ * @param states This is a collection of TaskState.
+ */
+ @Override
+ public void publishData(Collection<? extends WorkUnitState> states) throws
IOException {
+ CompletionService<Collection<HiveSpec>> completionService =
+ new ExecutorCompletionService<>(this.hivePolicyExecutor);
+
+ int toRegisterPathCount = computeSpecs(states, completionService);
for (int i = 0; i < toRegisterPathCount; i++) {
try {
for (HiveSpec spec : completionService.take().get()) {