This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1c6c24fe47 [GOBBLIN-2207] Make temporal unique classifier GaaS attempt
aware (#4116)
1c6c24fe47 is described below
commit 1c6c24fe47c1e4df3bfbdd60c161751248c9702c
Author: pratapaditya04 <[email protected]>
AuthorDate: Mon May 12 12:08:11 2025 +0530
[GOBBLIN-2207] Make temporal unique classifier GaaS attempt aware (#4116)
* made temporal unique classifier gaas attempt aware
---
.../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 +-
.../src/main/java/org/apache/gobblin/cluster/HelixUtils.java | 4 ++--
.../src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java | 4 ++--
.../org/apache/gobblin/service/modules/orchestration/DagUtils.java | 2 +-
.../java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java | 5 ++++-
.../gobblin/temporal/workflows/metrics/EventSubmitterContext.java | 4 ++--
6 files changed, 12 insertions(+), 9 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 1e9144640b..03f1a2629e 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -191,7 +191,7 @@ public class ConfigurationKeys {
public static final String JOB_GROUP_KEY = "job.group";
public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
- public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
+ public static final String JOB_ATTEMPT_ID = "job.attemptId";
public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 45c3685d17..cd0d2d0e7c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -183,9 +183,9 @@ public class HelixUtils {
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobExecutionId)));
}
- if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
+ jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 9ced348ac4..e1e3dcfbc7 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -399,9 +399,9 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
- if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
+ jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
index a2f5e1062d..a5a0850217 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
@@ -128,7 +128,7 @@ public class DagUtils {
public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
JobSpec jobSpec = dagNode.getValue().getJobSpec();
- Map<String, String> configWithCurrentAttempts =
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS,
String.valueOf(dagNode.getValue().getCurrentAttempts()),
+ Map<String, String> configWithCurrentAttempts =
ImmutableMap.of(ConfigurationKeys.JOB_ATTEMPT_ID,
String.valueOf(dagNode.getValue().getCurrentAttempts()),
ConfigurationKeys.JOB_CURRENT_GENERATION,
String.valueOf(dagNode.getValue().getCurrentGeneration()));
Properties configAsProperties = (Properties)
jobSpec.getConfigAsProperties().clone();
configAsProperties.putAll(configWithCurrentAttempts);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 2a2cf17049..8ee9a69091 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -62,6 +62,7 @@ public class Help {
public static final String USER_TO_PROXY_KEY = "user.to.proxy";
public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
+ public static final String DEFAULT_GAAS_ATTEMPT_ID = "1";
// treat `JobState` as immutable and cache, for reuse among activities
executed by the same worker
private static final transient Cache<Path, JobState> jobStateByPath =
CacheBuilder.newBuilder().recordStats().build();
@@ -105,7 +106,9 @@ public class Help {
? workerConfig.getString(USER_TO_PROXY_KEY) : "";
String gaasFlowExecId =
workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) :
UUID.randomUUID().toString();
- return userToProxy + "_" + gaasFlowExecId;
+ String gaasAttemptId =
workerConfig.hasPath(ConfigurationKeys.JOB_ATTEMPT_ID)
+ ? workerConfig.getString(ConfigurationKeys.JOB_ATTEMPT_ID) :
DEFAULT_GAAS_ATTEMPT_ID;
+ return String.join("_", userToProxy, gaasFlowExecId, gaasAttemptId);
}
public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
index 6ef80f00d8..e23b5dab7e 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
@@ -109,9 +109,9 @@ public class EventSubmitterContext {
this.tags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
}
- if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
this.tags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS,
"1")));
+ jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
this.tags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
this.tags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,