This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dcbe779c69 [GOBBLIN-2211] Use correct jobExecId when fetching
jobStatus (#4169)
dcbe779c69 is described below
commit dcbe779c69c30c1d9941cc717fac5f63e5b9f774
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Feb 27 12:25:55 2026 +0530
[GOBBLIN-2211] Use correct jobExecId when fetching jobStatus (#4169)
JobStatusRetriever.getJobExecutionId was reading from
JOB_EXECUTION_ID_FIELD instead of GAAS_JOB_EXEC_ID_HASH, causing incorrect job
execution IDs to be resolved in some flows. This fix reads the correct
property with safe fallback handling for missing or non-numeric values, and
adds unit tests covering the various input cases.
---
.../service/monitoring/JobStatusRetriever.java | 17 +++-
.../JobStatusRetrieverGetJobExecutionIdTest.java | 97 ++++++++++++++++++++++
.../service/monitoring/JobStatusRetrieverTest.java | 2 +
3 files changed, 115 insertions(+), 1 deletion(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 98889ac70c..fa56a57796 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -35,7 +35,10 @@ import com.typesafe.config.ConfigFactory;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.StateStore;
@@ -182,8 +185,20 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
}
+ /**
+ * Resolves job execution id from job state. Used {@link
ConfigurationKeys#GAAS_JOB_EXEC_ID_HASH} when present
+ * or returns 0 as default.
+ */
protected static final long getJobExecutionId(State jobState) {
- return
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
"0"));
+ String jobExecId =
jobState.getProp(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, "0");
+ if (StringUtils.isNotBlank(jobExecId)) {
+ try {
+ return Long.parseLong(jobExecId);
+ } catch (NumberFormatException e) {
+ log.error("gaas.job.executionid.hash is not numeric, deriving long
from string: {}", jobExecId, e);
+ }
+ }
+ return 0;
}
protected Iterator<JobStatus> asJobStatuses(List<State> jobStatusStates) {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverGetJobExecutionIdTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverGetJobExecutionIdTest.java
new file mode 100644
index 0000000000..b311899f52
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverGetJobExecutionIdTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * Unit tests for {@link JobStatusRetriever#getJobExecutionId(State)}.
+ *
+ * The method is {@code protected static}, so this test class lives in the
same package to access it directly.
+ */
+public class JobStatusRetrieverGetJobExecutionIdTest {
+
+ /** Returns a {@link State} with the given property value set for {@link
ConfigurationKeys#GAAS_JOB_EXEC_ID_HASH}. */
+ private static State stateWith(String value) {
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, value);
+ return new State(props);
+ }
+
+ @Test
+ public void testValidId() {
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("12345")),
12345L);
+ }
+
+ @Test
+ public void testPropertyNotSet_returnsZero() {
+ // No property set — getProp defaults to "0", which parses to 0L
+ Assert.assertEquals(JobStatusRetriever.getJobExecutionId(new State()), 0L);
+ }
+
+ @Test
+ public void testExplicitZero_returnsZero() {
+ Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("0")),
0L);
+ }
+
+ @Test
+ public void testNonNumericString_returnsZero() {
+ // NumberFormatException is caught internally; method falls back to 0
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("not-a-number")),
0L);
+ }
+
+ @Test
+ public void testEmptyString_returnsZero() {
+ // Empty string is skipped by the isEmpty() guard; returns default 0
+ Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("")),
0L);
+ }
+
+ @Test
+ public void testNegativeNumber_returnsParsedValue() {
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("-42")),
-42L);
+ }
+
+ @Test
+ public void testMaxLong_returnsParsedValue() {
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith(String.valueOf(Long.MAX_VALUE))),
Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testLargePositiveId() {
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("9876543210")),
9876543210L);
+ }
+
+ @Test
+ public void testHexString_returnsZero() {
+ // Hex strings are not valid for Long.parseLong without radix
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("0xFF")),
0L);
+ }
+
+ @Test
+ public void testFloatString_returnsZero() {
+ // Floating point strings are not valid for Long.parseLong
+
Assert.assertEquals(JobStatusRetriever.getJobExecutionId(stateWith("3.14")),
0L);
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 85891023b1..42c323afce 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -30,6 +30,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
@@ -87,6 +88,7 @@ public abstract class JobStatusRetrieverTest {
properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
jobGroup = MY_JOB_GROUP;
+ properties.setProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,
String.valueOf(JOB_EXECUTION_ID));
properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
String.valueOf(JOB_EXECUTION_ID));
properties.setProperty(TimingEvent.METADATA_MESSAGE, MESSAGE);
} else {