This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bc222f5cf move dataset handler code before cleaning up staging data
(#3594)
bc222f5cf is described below
commit bc222f5cf71d00b706a7db255ecf3c8dc3cfd9a3
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Oct 31 12:55:09 2022 -0500
move dataset handler code before cleaning up staging data (#3594)
---
.../gobblin/runtime/AbstractJobLauncher.java | 27 +++++++++++-----------
1 file changed, 13 insertions(+), 14 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 84cab042e..d7efdf549 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.runtime;
-import com.github.rholder.retry.RetryException;
-import com.google.common.eventbus.Subscribe;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.Authenticator;
@@ -35,14 +33,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.source.InfiniteSource;
-import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import com.github.rholder.retry.RetryException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
@@ -54,6 +49,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -92,10 +88,13 @@ import org.apache.gobblin.runtime.locks.JobLock;
import org.apache.gobblin.runtime.locks.JobLockEventListener;
import org.apache.gobblin.runtime.locks.JobLockException;
import org.apache.gobblin.runtime.locks.LegacyJobLockFactoryManager;
+import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.InfiniteSource;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -103,6 +102,7 @@ import
org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -508,6 +508,13 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
}
+ // Perform work needed before writing is done
+ Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
+ try (DestinationDatasetHandlerService destinationDatasetHandlerService
=
+ new DestinationDatasetHandlerService(jobState, canCleanUp,
this.eventSubmitter)) {
+ workUnitStream =
destinationDatasetHandlerService.executeHandlers(workUnitStream);
+ }
+
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState,
workUnitStream)).initialize();
@@ -540,14 +547,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
// Add task ids
workUnitStream = prepareWorkUnits(workUnitStream, jobState);
-
- // Perform work needed before writing is done
- Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
- try (DestinationDatasetHandlerService
destinationDatasetHandlerService =
- new DestinationDatasetHandlerService(jobState, canCleanUp,
this.eventSubmitter)) {
- workUnitStream =
destinationDatasetHandlerService.executeHandlers(workUnitStream);
- }
-
// Remove skipped workUnits from the list of work units to execute.
workUnitStream = workUnitStream.filter(new
SkippedWorkUnitsFilter(jobState));
// Add surviving tasks to jobState