This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7ac0ce754 [GOBBLIN-2028] make DagManagementStateStore return job
status (#3905)
7ac0ce754 is described below
commit 7ac0ce7546228f751ed0bd2b4e96c7d22d93e576
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Mar 27 20:05:19 2024 -0700
[GOBBLIN-2028] make DagManagementStateStore return job status (#3905)
* add job status retriever in DMSS
* update javadoc
---
.../orchestration/DagManagementStateStore.java | 14 ++++++--
.../MostlyMySqlDagManagementStateStore.java | 38 ++++++++++++++++++----
.../DagManagementTaskStreamImplTest.java | 2 +-
.../orchestration/DagProcessingEngineTest.java | 2 +-
.../MostlyMySqlDagManagementStateStoreTest.java | 18 ++++++++--
.../modules/orchestration/OrchestratorTest.java | 2 +-
6 files changed, 62 insertions(+), 14 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index bf3b41f8f..86df6bee8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -25,6 +25,8 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -32,6 +34,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
/**
@@ -129,11 +132,11 @@ public interface DagManagementStateStore {
void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId
dagId) throws IOException;
/**
- * Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}, or Optional.absent
if it
- * is not found.
+ * Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link
JobStatus},
+ * or Optional.absent if it is not found.
* @param dagNodeId of the dag node
*/
- Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId);
+ Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId);
/**
* Returns a list of {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
@@ -186,4 +189,9 @@ public interface DagManagementStateStore {
* Returns a {@link DagManagerMetrics} that monitors dags execution.
*/
DagManagerMetrics getDagManagerMetrics();
+
+ /**
+ * @return {@link JobStatus} or {@link Optional#empty} if not present in the
Job-Status Store
+ */
+ Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index ace6824db..7ddfdf616 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -27,6 +28,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.typesafe.config.Config;
@@ -41,6 +45,8 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -62,6 +68,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
private final Map<DagManager.DagId, Long> dagToDeadline = new
ConcurrentHashMap<>();
private DagStateStore dagStateStore;
private DagStateStore failedDagStateStore;
+ private JobStatusRetriever jobStatusRetriever;
private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
Map<URI, TopologySpec> topologySpecMap;
@@ -73,10 +80,12 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
private final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics();
@Inject
- public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog, UserQuotaManager userQuotaManager) {
+ public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog, UserQuotaManager userQuotaManager,
+ JobStatusRetriever jobStatusRetriever) {
this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
+ this.jobStatusRetriever = jobStatusRetriever;
this.dagManagerMetrics.activate();
}
@@ -192,7 +201,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
@Override
public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws
IOException {
- return Optional.of(this.dagStateStore.getDag(dagId.toString()));
+ return Optional.ofNullable(this.dagStateStore.getDag(dagId.toString()));
}
@Override
@@ -200,12 +209,15 @@ public class MostlyMySqlDagManagementStateStore
implements DagManagementStateSto
return this.dagStateStore.existsDag(dagId);
}
- @Override
- public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId
dagNodeId) {
- return Optional.of(this.dagNodes.get(dagNodeId));
+ public Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
+ Optional<JobStatus> jobStatus = getJobStatus(dagNodeId);
+ if (this.dagNodes.containsKey(dagNodeId) && jobStatus.isPresent()) {
+ return Optional.of(ImmutablePair.of(this.dagNodes.get(dagNodeId),
jobStatus.get()));
+ } else {
+ return Optional.empty();
+ }
}
-
@Override
public Optional<Dag<JobExecutionPlan>>
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
return Optional.of(this.jobToDag.get(dagNode));
@@ -235,4 +247,18 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
return this.quotaManager.releaseQuota(dagNode);
}
+
+ @Override
+ public Optional<JobStatus> getJobStatus(DagNodeId dagNodeId) {
+ Iterator<JobStatus> jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(dagNodeId.getFlowName(),
+ dagNodeId.getFlowGroup(), dagNodeId.getFlowExecutionId(),
dagNodeId.getJobName(), dagNodeId.getJobGroup());
+
+ if (jobStatusIterator.hasNext()) {
+ // there must exist exactly one job status for a dag node id, because
fields of dag node id makes the primary key
+ // of the job status table
+ return Optional.of(jobStatusIterator.next());
+ } else {
+ return java.util.Optional.empty();
+ }
+ }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 5a42e05d7..3ca35f4a3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -72,7 +72,7 @@ public class DagManagementTaskStreamImplTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, null);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config, Optional.empty(),
Optional.of(mock(ReminderSettingDagProcLeaseArbiter.class)));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index b2a72d05b..99f5bd577 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -76,7 +76,7 @@ public class DagProcessingEngineTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null);
+ this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, null);
this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config, Optional.empty(), null);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 6e3750c3b..af1532b8a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -26,6 +26,7 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
@@ -37,9 +38,17 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
/**
@@ -78,7 +87,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getDag(dagId).get().toString());
- Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNode(dagNodeId).get());
+ Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).get().getLeft());
Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
List<Dag.DagNode<JobExecutionPlan>> dagNodes =
this.dagManagementStateStore.getDagNodes(dagId);
@@ -105,6 +114,11 @@ public class MostlyMySqlDagManagementStateStoreTest {
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
Config config = configBuilder.build();
+ JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
+ JobStatus dummyJobStatus =
JobStatus.builder().flowName("fn").flowGroup("fg").jobGroup("fg").jobName("job0")
+ .flowExecutionId(12345L).message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).build();
+
doReturn(Lists.newArrayList(dummyJobStatus).iterator()).when(jobStatusRetriever)
+ .getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(),
anyString(), anyString());
// Constructing TopologySpecMap.
Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
@@ -112,7 +126,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, jobStatusRetriever);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 8aacc6e0d..a9e5097c6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -122,7 +122,7 @@ public class OrchestratorTest {
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE).build();
MostlyMySqlDagManagementStateStore dagManagementStateStore =
- new MostlyMySqlDagManagementStateStore(config, null, null);
+ new MostlyMySqlDagManagementStateStore(config, null, null, null);
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,