This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac0ce754 [GOBBLIN-2028] make DagManagementStateStore return job 
status (#3905)
7ac0ce754 is described below

commit 7ac0ce7546228f751ed0bd2b4e96c7d22d93e576
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Mar 27 20:05:19 2024 -0700

    [GOBBLIN-2028] make DagManagementStateStore return job status (#3905)
    
    * add job status retriever in DMSS
    * update javadoc
---
 .../orchestration/DagManagementStateStore.java     | 14 ++++++--
 .../MostlyMySqlDagManagementStateStore.java        | 38 ++++++++++++++++++----
 .../DagManagementTaskStreamImplTest.java           |  2 +-
 .../orchestration/DagProcessingEngineTest.java     |  2 +-
 .../MostlyMySqlDagManagementStateStoreTest.java    | 18 ++++++++--
 .../modules/orchestration/OrchestratorTest.java    |  2 +-
 6 files changed, 62 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index bf3b41f8f..86df6bee8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -25,6 +25,8 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.exception.QuotaExceededException;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -32,6 +34,7 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
 
 
 /**
@@ -129,11 +132,11 @@ public interface DagManagementStateStore {
   void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId) throws IOException;
 
   /**
-   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}, or Optional.absent 
if it
-   * is not found.
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link 
JobStatus},
+   * or Optional.absent if it is not found.
    * @param dagNodeId of the dag node
    */
-  Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId);
+  Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId);
 
   /**
    * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
@@ -186,4 +189,9 @@ public interface DagManagementStateStore {
    * Returns a {@link DagManagerMetrics} that monitors dags execution.
    */
   DagManagerMetrics getDagManagerMetrics();
+
+  /**
+   * @return {@link JobStatus} or {@link Optional#empty} if not present in the 
Job-Status Store
+   */
+  Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index ace6824db..7ddfdf616 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -27,6 +28,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
@@ -41,6 +45,8 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -62,6 +68,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
   private DagStateStore dagStateStore;
   private DagStateStore failedDagStateStore;
+  private JobStatusRetriever jobStatusRetriever;
   private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
   Map<URI, TopologySpec> topologySpecMap;
@@ -73,10 +80,12 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   private final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics();
 
   @Inject
-  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog, UserQuotaManager userQuotaManager) {
+  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog, UserQuotaManager userQuotaManager,
+      JobStatusRetriever jobStatusRetriever) {
     this.quotaManager = userQuotaManager;
     this.config = config;
     this.flowCatalog = flowCatalog;
+    this.jobStatusRetriever = jobStatusRetriever;
     this.dagManagerMetrics.activate();
    }
 
@@ -192,7 +201,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
 
   @Override
   public Optional<Dag<JobExecutionPlan>> getDag(DagManager.DagId dagId) throws 
IOException {
-    return Optional.of(this.dagStateStore.getDag(dagId.toString()));
+    return Optional.ofNullable(this.dagStateStore.getDag(dagId.toString()));
   }
 
   @Override
@@ -200,12 +209,15 @@ public class MostlyMySqlDagManagementStateStore 
implements DagManagementStateSto
     return this.dagStateStore.existsDag(dagId);
   }
 
-  @Override
-  public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId 
dagNodeId) {
-    return Optional.of(this.dagNodes.get(dagNodeId));
+  public Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
+    Optional<JobStatus> jobStatus = getJobStatus(dagNodeId);
+    if (this.dagNodes.containsKey(dagNodeId) && jobStatus.isPresent()) {
+      return Optional.of(ImmutablePair.of(this.dagNodes.get(dagNodeId), 
jobStatus.get()));
+    } else {
+      return Optional.empty();
+    }
   }
 
-
   @Override
   public Optional<Dag<JobExecutionPlan>> 
getParentDag(Dag.DagNode<JobExecutionPlan> dagNode) {
     return Optional.of(this.jobToDag.get(dagNode));
@@ -235,4 +247,18 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException {
     return this.quotaManager.releaseQuota(dagNode);
   }
+
+  @Override
+  public Optional<JobStatus> getJobStatus(DagNodeId dagNodeId) {
+    Iterator<JobStatus> jobStatusIterator = 
this.jobStatusRetriever.getJobStatusesForFlowExecution(dagNodeId.getFlowName(),
+        dagNodeId.getFlowGroup(), dagNodeId.getFlowExecutionId(), 
dagNodeId.getJobName(), dagNodeId.getJobGroup());
+
+    if (jobStatusIterator.hasNext()) {
+      // there must exist exactly one job status for a dag node id, because 
fields of dag node id makes the primary key
+      // of the job status table
+      return Optional.of(jobStatusIterator.next());
+    } else {
+      return java.util.Optional.empty();
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 5a42e05d7..3ca35f4a3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -72,7 +72,7 @@ public class DagManagementTaskStreamImplTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, null);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, Optional.empty(), 
Optional.of(mock(ReminderSettingDagProcLeaseArbiter.class)));
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index b2a72d05b..99f5bd577 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -76,7 +76,7 @@ public class DagProcessingEngineTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
+    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, null);
     this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, Optional.empty(), null);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 6e3750c3b..af1532b8a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -26,6 +26,7 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.zaxxer.hikari.HikariDataSource;
 
@@ -37,9 +38,17 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 
 /**
@@ -78,7 +87,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
 
     Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
     Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).get().toString());
-    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNode(dagNodeId).get());
+    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).get().getLeft());
     Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
 
     List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
@@ -105,6 +114,11 @@ public class MostlyMySqlDagManagementStateStoreTest {
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
     Config config = configBuilder.build();
+    JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
+    JobStatus dummyJobStatus = 
JobStatus.builder().flowName("fn").flowGroup("fg").jobGroup("fg").jobName("job0")
+        .flowExecutionId(12345L).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).build();
+    
doReturn(Lists.newArrayList(dummyJobStatus).iterator()).when(jobStatusRetriever)
+        .getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), 
anyString(), anyString());
 
     // Constructing TopologySpecMap.
     Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
@@ -112,7 +126,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, jobStatusRetriever);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 8aacc6e0d..a9e5097c6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -122,7 +122,7 @@ public class OrchestratorTest {
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE).build();
 
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        new MostlyMySqlDagManagementStateStore(config, null, null);
+        new MostlyMySqlDagManagementStateStore(config, null, null, null);
 
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,

Reply via email to