This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bc222f5cf move dataset handler code before cleaning up staging data 
(#3594)
bc222f5cf is described below

commit bc222f5cf71d00b706a7db255ecf3c8dc3cfd9a3
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Oct 31 12:55:09 2022 -0500

    move dataset handler code before cleaning up staging data (#3594)
---
 .../gobblin/runtime/AbstractJobLauncher.java       | 27 +++++++++++-----------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 84cab042e..d7efdf549 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.runtime;
 
-import com.github.rholder.retry.RetryException;
-import com.google.common.eventbus.Subscribe;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.Authenticator;
@@ -35,14 +33,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.source.InfiniteSource;
-import org.apache.gobblin.stream.WorkUnitChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.github.rholder.retry.RetryException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Function;
@@ -54,6 +49,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -92,10 +88,13 @@ import org.apache.gobblin.runtime.locks.JobLock;
 import org.apache.gobblin.runtime.locks.JobLockEventListener;
 import org.apache.gobblin.runtime.locks.JobLockException;
 import org.apache.gobblin.runtime.locks.LegacyJobLockFactoryManager;
+import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
 import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.InfiniteSource;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -103,6 +102,7 @@ import 
org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
 import org.apache.gobblin.util.ClusterNameTags;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
@@ -508,6 +508,13 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           }
         }
 
+        // Perform work needed before writing is done
+        Boolean canCleanUp = 
this.canCleanStagingData(this.jobContext.getJobState());
+        try (DestinationDatasetHandlerService destinationDatasetHandlerService 
=
+            new DestinationDatasetHandlerService(jobState, canCleanUp, 
this.eventSubmitter)) {
+          workUnitStream = 
destinationDatasetHandlerService.executeHandlers(workUnitStream);
+        }
+
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
         closer.register(ConverterInitializerFactory.newInstance(jobState, 
workUnitStream)).initialize();
@@ -540,14 +547,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
               
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
           // Add task ids
           workUnitStream = prepareWorkUnits(workUnitStream, jobState);
-
-          // Perform work needed before writing is done
-          Boolean canCleanUp = 
this.canCleanStagingData(this.jobContext.getJobState());
-          try (DestinationDatasetHandlerService 
destinationDatasetHandlerService =
-              new DestinationDatasetHandlerService(jobState, canCleanUp, 
this.eventSubmitter)) {
-            workUnitStream = 
destinationDatasetHandlerService.executeHandlers(workUnitStream);
-          }
-
           // Remove skipped workUnits from the list of work units to execute.
           workUnitStream = workUnitStream.filter(new 
SkippedWorkUnitsFilter(jobState));
           // Add surviving tasks to jobState

Reply via email to