This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ddd9468b3d Propagate file system job properties to Work units (#4124)
ddd9468b3d is described below

commit ddd9468b3deda7ca0935c48796e05ed68f85d85c
Author: thisisArjit <[email protected]>
AuthorDate: Mon Jul 28 10:14:27 2025 +0530

    Propagate file system job properties to Work units (#4124)
---
 .../temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java     |  2 +-
 .../ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java   |  8 ++++++--
 .../temporal/ddm/work/PriorJobStateWUProcessingSpec.java       |  5 +++--
 .../org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java |  4 +++-
 .../apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java   |  4 +++-
 .../temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java |  7 ++++++-
 .../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java        | 10 +++++++---
 .../work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java   |  3 ++-
 8 files changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 954269948f..64c21289ad 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -90,7 +90,7 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
       EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext.Builder()
           .withEventSubmitter(eventSubmitter)
           .build();
-      PriorJobStateWUProcessingSpec wuSpec = new 
PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(), 
eventSubmitterContext);
+      PriorJobStateWUProcessingSpec wuSpec = new 
PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(), 
eventSubmitterContext, this.jobProps);
       if 
(this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
           
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
         int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
index 24f6363093..8c1b1a94c2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.temporal.ddm.work;
 import java.net.URI;
 import java.util.Comparator;
 import java.util.Optional;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -44,16 +45,19 @@ import org.apache.gobblin.util.WorkUnitSizeInfo;
 public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends 
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
   private EventSubmitterContext eventSubmitterContext;
 
-  public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String 
hdfsDir, EventSubmitterContext eventSubmitterContext) {
+  private Properties fileSystemProperties;
+
+  public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String 
hdfsDir, EventSubmitterContext eventSubmitterContext, Properties 
fileSystemProperties) {
     super(fileSystemUri, hdfsDir);
     this.eventSubmitterContext = eventSubmitterContext;
+    this.fileSystemProperties = fileSystemProperties;
   }
 
   @Override
   protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
     // begin by setting all correlators to empty string - later we'll 
`acknowledgeOrdering()`
     Path filePath = fileStatus.getPath();
-    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), 
this.eventSubmitterContext);
+    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), 
this.eventSubmitterContext, fileSystemProperties);
   }
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
index 02ba87b3fa..0869e42037 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -51,8 +52,8 @@ public class PriorJobStateWUProcessingSpec extends 
WUProcessingSpec {
   @NonNull private List<Tag<?>> tags = new ArrayList<>();
   @NonNull private String metricsSuffix = 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
 
-  public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, 
EventSubmitterContext eventSubmitterContext) {
-    super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+  public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, 
EventSubmitterContext eventSubmitterContext, Properties fileSystemProperties) {
+    super(fileSystemUri, workUnitsDir, eventSubmitterContext, 
fileSystemProperties);
   }
 
   @JsonIgnore
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 3a05e60f01..aca139094c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
 
@@ -51,6 +52,7 @@ public class WUProcessingSpec implements FileSystemApt, 
FileSystemJobStateful {
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitsDir;
   @NonNull private EventSubmitterContext eventSubmitterContext;
+  @NonNull private Properties fileSystemProperties;
   @NonNull @Setter(AccessLevel.PUBLIC) private Tuning tuning = Tuning.DEFAULT;
 
   /** whether to conduct job-level timing (and send results via GTE) */
@@ -62,7 +64,7 @@ public class WUProcessingSpec implements FileSystemApt, 
FileSystemJobStateful {
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   @Override
   public State getFileSystemConfig() {
-    return new State(); // TODO - figure out how to truly set!
+    return new State(fileSystemProperties);
   }
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index 0c068cb3ed..2a6981e43a 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
 
@@ -59,11 +60,12 @@ public class WorkUnitClaimCheck implements FileSystemApt, 
FileSystemJobStateful
   @NonNull private String workUnitPath;
   @NonNull private WorkUnitSizeInfo workUnitSizeInfo;
   @NonNull private EventSubmitterContext eventSubmitterContext;
+  @NonNull private Properties fileSystemProperties;
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   @Override
   public State getFileSystemConfig() {
-    return new State(); // TODO - figure out how to truly set!
+    return new State(fileSystemProperties);
   }
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 198ab8a6d5..f1b4444d3b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -85,6 +85,9 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
     final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
         
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+    // Add File system properties to the temporal job properties
+    
temporalJobProps.putAll(PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+        
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)));
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, 
temporalJobProps);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
@@ -197,7 +200,9 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     JobState jobState = new JobState(jobProps);
     URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
     Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-    WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+    Properties fsProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+        
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
+    WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext, fsProps);
     // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
     if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
         && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index e0119cfc5d..60aceee0e6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -29,6 +29,7 @@ import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -46,6 +47,7 @@ import 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 @Slf4j
@@ -62,7 +64,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   }
 
   private CommitStats performWork(WUProcessingSpec workSpec, final Properties 
props) {
-    Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+    Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec, props);
     Map<String, Object> searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(props);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
 
@@ -121,9 +123,11 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     }
   }
 
-  protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec 
workSpec) {
+  protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec 
workSpec, Properties props) {
+    Properties fsProps = PropertiesUtils.extractPropertiesWithPrefix(props,
+        
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
     return new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), 
workSpec.getWorkUnitsDir(),
-        workSpec.getEventSubmitterContext());
+        workSpec.getEventSubmitterContext(), fsProps);
   }
 
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f,
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
index 6df60857eb..b1227021d9 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -53,7 +54,7 @@ public class EagerFsDirBackedWorkUnitClaimCheckWorkloadTest {
     URI fileSystemUri = new URI("hdfs://localhost:9000");
     String hdfsDir = "/test/dir";
     eventSubmitterContext = Mockito.mock(EventSubmitterContext.class);
-    workload = Mockito.spy(new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir, 
eventSubmitterContext));
+    workload = Mockito.spy(new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir, 
eventSubmitterContext, new Properties()));
     mockFileSystem = Mockito.mock(FileSystem.class);
 
     MockedStatic<HadoopUtils> mockedHadoopUtils = 
Mockito.mockStatic(HadoopUtils.class);

Reply via email to