This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 444f2664be [GOBBLIN-2144] Prevent `GenerateWorkUnitsImpl` from
inadvertently cleaning up temp/staging dirs (#4039)
444f2664be is described below
commit 444f2664be7fb3a44342cc481aaefb46542e8ede
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Aug 27 15:04:32 2024 -0700
[GOBBLIN-2144] Prevent `GenerateWorkUnitsImpl` from inadvertently cleaning
up temp/staging dirs (#4039)
---
.../modules/orchestration/DagManagementTaskStreamImpl.java | 1 +
.../temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java | 14 ++------------
2 files changed, 3 insertions(+), 12 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 18672a99f1..509218a10d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -110,6 +110,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
this.dagProcEngineMetrics = dagProcEngineMetrics;
}
+ @Override
public synchronized void addDagAction(DagActionStore.LeaseParams
leaseParams) {
log.info("Adding {} to queue...", leaseParams);
if (!this.leaseParamsQueue.offer(leaseParams)) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 0ed0675f64..242b2c69e5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -30,7 +30,6 @@ import com.google.common.io.Closer;
import io.temporal.failure.ApplicationFailure;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -117,9 +116,9 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// TODO: count total bytes for progress tracking!
- boolean canCleanUp = canCleanStagingData(jobState);
+ boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher`
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for
cleanup
DestinationDatasetHandlerService datasetHandlerService = closer.register(
- new DestinationDatasetHandlerService(jobState, canCleanUp,
eventSubmitterContext.create()));
+ new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
// initialize writer and converter(s)
@@ -143,13 +142,4 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
}
-
- protected static boolean canCleanStagingData(JobState jobState) {
- if
(DeliverySemantics.EXACTLY_ONCE.equals(DeliverySemantics.parse(jobState))) {
- String errMsg = "DeliverySemantics.EXACTLY_ONCE NOT currently supported;
job " + jobState.getJobId();
- log.error(errMsg);
- throw ApplicationFailure.newNonRetryableFailure(errMsg, "Unsupported:
DeliverySemantics.EXACTLY_ONCE");
- }
- return true;
- }
}