This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ddd9468b3d Propagate file system job properties to Work units (#4124)
ddd9468b3d is described below
commit ddd9468b3deda7ca0935c48796e05ed68f85d85c
Author: thisisArjit <[email protected]>
AuthorDate: Mon Jul 28 10:14:27 2025 +0530
Propagate file system job properties to Work units (#4124)
---
.../temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java | 2 +-
.../ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java | 8 ++++++--
.../temporal/ddm/work/PriorJobStateWUProcessingSpec.java | 5 +++--
.../org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java | 4 +++-
.../apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java | 4 +++-
.../temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java | 7 ++++++-
.../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java | 10 +++++++---
.../work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java | 3 ++-
8 files changed, 31 insertions(+), 12 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 954269948f..64c21289ad 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -90,7 +90,7 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder()
.withEventSubmitter(eventSubmitter)
.build();
- PriorJobStateWUProcessingSpec wuSpec = new
PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(),
eventSubmitterContext);
+ PriorJobStateWUProcessingSpec wuSpec = new
PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(),
eventSubmitterContext, this.jobProps);
if
(this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
index 24f6363093..8c1b1a94c2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
import java.util.Comparator;
import java.util.Optional;
+import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -44,16 +45,19 @@ import org.apache.gobblin.util.WorkUnitSizeInfo;
public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
private EventSubmitterContext eventSubmitterContext;
- public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String
hdfsDir, EventSubmitterContext eventSubmitterContext) {
+ private Properties fileSystemProperties;
+
+ public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String
hdfsDir, EventSubmitterContext eventSubmitterContext, Properties
fileSystemProperties) {
super(fileSystemUri, hdfsDir);
this.eventSubmitterContext = eventSubmitterContext;
+ this.fileSystemProperties = fileSystemProperties;
}
@Override
protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
// begin by setting all correlators to empty string - later we'll
`acknowledgeOrdering()`
Path filePath = fileStatus.getPath();
- return new WorkUnitClaimCheck("", this.getFileSystemUri(),
filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath),
this.eventSubmitterContext);
+ return new WorkUnitClaimCheck("", this.getFileSystemUri(),
filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath),
this.eventSubmitterContext, fileSystemProperties);
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
index 02ba87b3fa..0869e42037 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -51,8 +52,8 @@ public class PriorJobStateWUProcessingSpec extends
WUProcessingSpec {
@NonNull private List<Tag<?>> tags = new ArrayList<>();
@NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
- public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir,
EventSubmitterContext eventSubmitterContext) {
- super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+ public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir,
EventSubmitterContext eventSubmitterContext, Properties fileSystemProperties) {
+ super(fileSystemUri, workUnitsDir, eventSubmitterContext,
fileSystemProperties);
}
@JsonIgnore
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 3a05e60f01..aca139094c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
+import java.util.Properties;
import org.apache.hadoop.fs.Path;
@@ -51,6 +52,7 @@ public class WUProcessingSpec implements FileSystemApt,
FileSystemJobStateful {
@NonNull private URI fileSystemUri;
@NonNull private String workUnitsDir;
@NonNull private EventSubmitterContext eventSubmitterContext;
+ @NonNull private Properties fileSystemProperties;
@NonNull @Setter(AccessLevel.PUBLIC) private Tuning tuning = Tuning.DEFAULT;
/** whether to conduct job-level timing (and send results via GTE) */
@@ -62,7 +64,7 @@ public class WUProcessingSpec implements FileSystemApt,
FileSystemJobStateful {
@JsonIgnore // (because no-arg method resembles 'java bean property')
@Override
public State getFileSystemConfig() {
- return new State(); // TODO - figure out how to truly set!
+ return new State(fileSystemProperties);
}
@JsonIgnore // (because no-arg method resembles 'java bean property')
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index 0c068cb3ed..2a6981e43a 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
+import java.util.Properties;
import org.apache.hadoop.fs.Path;
@@ -59,11 +60,12 @@ public class WorkUnitClaimCheck implements FileSystemApt,
FileSystemJobStateful
@NonNull private String workUnitPath;
@NonNull private WorkUnitSizeInfo workUnitSizeInfo;
@NonNull private EventSubmitterContext eventSubmitterContext;
+ @NonNull private Properties fileSystemProperties;
@JsonIgnore // (because no-arg method resembles 'java bean property')
@Override
public State getFileSystemConfig() {
- return new State(); // TODO - figure out how to truly set!
+ return new State(fileSystemProperties);
}
@JsonIgnore // (because no-arg method resembles 'java bean property')
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 198ab8a6d5..f1b4444d3b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -85,6 +85,9 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
// Filtering only temporal job properties to pass to child workflows to
avoid passing unnecessary properties
final Properties temporalJobProps =
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+ // Add File system properties to the temporal job properties
+
temporalJobProps.putAll(PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)));
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext,
temporalJobProps);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
@@ -197,7 +200,9 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
JobState jobState = new JobState(jobProps);
URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ Properties fsProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext, fsProps);
// TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index e0119cfc5d..60aceee0e6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -29,6 +29,7 @@ import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -46,6 +47,7 @@ import
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.util.PropertiesUtils;
@Slf4j
@@ -62,7 +64,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
private CommitStats performWork(WUProcessingSpec workSpec, final Properties
props) {
- Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+ Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec, props);
Map<String, Object> searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(props);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
@@ -121,9 +123,11 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
}
- protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec
workSpec) {
+ protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec
workSpec, Properties props) {
+ Properties fsProps = PropertiesUtils.extractPropertiesWithPrefix(props,
+
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
return new
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(),
workSpec.getWorkUnitsDir(),
- workSpec.getEventSubmitterContext());
+ workSpec.getEventSubmitterContext(), fsProps);
}
protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f,
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
index 6df60857eb..b1227021d9 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -53,7 +54,7 @@ public class EagerFsDirBackedWorkUnitClaimCheckWorkloadTest {
URI fileSystemUri = new URI("hdfs://localhost:9000");
String hdfsDir = "/test/dir";
eventSubmitterContext = Mockito.mock(EventSubmitterContext.class);
- workload = Mockito.spy(new
EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir,
eventSubmitterContext));
+ workload = Mockito.spy(new
EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir,
eventSubmitterContext, new Properties()));
mockFileSystem = Mockito.mock(FileSystem.class);
MockedStatic<HadoopUtils> mockedHadoopUtils =
Mockito.mockStatic(HadoopUtils.class);