This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 444f2664be [GOBBLIN-2144] Prevent `GenerateWorkUnitsImpl` from 
inadvertently cleaning up temp/staging dirs (#4039)
444f2664be is described below

commit 444f2664be7fb3a44342cc481aaefb46542e8ede
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Aug 27 15:04:32 2024 -0700

    [GOBBLIN-2144] Prevent `GenerateWorkUnitsImpl` from inadvertently cleaning 
up temp/staging dirs (#4039)
---
 .../modules/orchestration/DagManagementTaskStreamImpl.java |  1 +
 .../temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java  | 14 ++------------
 2 files changed, 3 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 18672a99f1..509218a10d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -110,6 +110,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     this.dagProcEngineMetrics = dagProcEngineMetrics;
   }
 
+  @Override
   public synchronized void addDagAction(DagActionStore.LeaseParams 
leaseParams) {
     log.info("Adding {} to queue...", leaseParams);
     if (!this.leaseParamsQueue.offer(leaseParams)) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 0ed0675f64..242b2c69e5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -30,7 +30,6 @@ import com.google.common.io.Closer;
 import io.temporal.failure.ApplicationFailure;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -117,9 +116,9 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
 
     // TODO: count total bytes for progress tracking!
 
-    boolean canCleanUp = canCleanStagingData(jobState);
+    boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` 
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for 
cleanup
     DestinationDatasetHandlerService datasetHandlerService = closer.register(
-        new DestinationDatasetHandlerService(jobState, canCleanUp, 
eventSubmitterContext.create()));
+        new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
     WorkUnitStream handledWorkUnitStream = 
datasetHandlerService.executeHandlers(workUnitStream);
 
     // initialize writer and converter(s)
@@ -143,13 +142,4 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
 
     return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
   }
-
-  protected static boolean canCleanStagingData(JobState jobState) {
-    if 
(DeliverySemantics.EXACTLY_ONCE.equals(DeliverySemantics.parse(jobState))) {
-      String errMsg = "DeliverySemantics.EXACTLY_ONCE NOT currently supported; 
job " + jobState.getJobId();
-      log.error(errMsg);
-      throw ApplicationFailure.newNonRetryableFailure(errMsg, "Unsupported: 
DeliverySemantics.EXACTLY_ONCE");
-    }
-    return true;
-  }
 }

Reply via email to