This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 442b5b24cc serialize/deserialize curretnAttemps and currentGeneration 
in dag node (#4014)
442b5b24cc is described below

commit 442b5b24cc91ee16b1f03a6056f8fa7d6c3b923d
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Aug 2 16:33:47 2024 -0700

    serialize/deserialize curretnAttemps and currentGeneration in dag node 
(#4014)
---
 .../gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java   | 2 +-
 .../service/modules/spec/JobExecutionPlanListDeserializer.java        | 2 ++
 .../gobblin/service/modules/spec/JobExecutionPlanListSerializer.java  | 2 ++
 .../apache/gobblin/service/modules/spec/SerializationConstants.java   | 2 ++
 .../modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java     | 4 ++++
 .../service/modules/orchestration/proc/ReevaluateDagProcTest.java     | 3 +--
 6 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
index f95f65fab1..4dc0cb1f7f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
@@ -44,7 +44,7 @@ import org.apache.gobblin.util.CompletedFuture;
 public class InMemorySpecExecutor extends AbstractSpecExecutor {
   // Communication mechanism components.
   // Not specifying final for further extension based on this implementation.
-  private SpecProducer<Spec> inMemorySpecProducer;
+  private final SpecProducer<Spec> inMemorySpecProducer;
 
   public InMemorySpecExecutor(Config config){
     this(config, Optional.absent());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index c51a9f3782..e5fffafc24 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -110,6 +110,8 @@ public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<J
 
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, 
specExecutor);
       jobExecutionPlan.setExecutionStatus(executionStatus);
+      
jobExecutionPlan.setCurrentGeneration(serializedJobExecutionPlan.get(SerializationConstants.CURRENT_GENERATION_KEY).getAsInt());
+      
jobExecutionPlan.setCurrentAttempts(serializedJobExecutionPlan.get(SerializationConstants.CURRENT_ATTEMPTS_KEY).getAsInt());
 
       JsonElement flowStartTime = 
serializedJobExecutionPlan.get(SerializationConstants.FLOW_START_TIME_KEY);
       if (flowStartTime != null) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
index 3d2fe36a56..d3cc778ede 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -82,6 +82,8 @@ public class JobExecutionPlanListSerializer implements 
JsonSerializer<List<JobEx
 
       String executionStatus = jobExecutionPlan.getExecutionStatus().name();
       
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY, 
executionStatus);
+      
jobExecutionPlanJson.addProperty(SerializationConstants.CURRENT_GENERATION_KEY, 
jobExecutionPlan.getCurrentGeneration());
+      
jobExecutionPlanJson.addProperty(SerializationConstants.CURRENT_ATTEMPTS_KEY, 
jobExecutionPlan.getCurrentAttempts());
 
       
jobExecutionPlanJson.addProperty(SerializationConstants.FLOW_START_TIME_KEY, 
jobExecutionPlan.getFlowStartTime());
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index 7cccf9f723..82c67f617e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -31,6 +31,8 @@ public class SerializationConstants {
   public static final String SPEC_EXECUTOR_URI_KEY = "uri";
 
   public static final String EXECUTION_STATUS_KEY = "executionStatus";
+  public static final String CURRENT_GENERATION_KEY = "currentGeneration";
+  public static final String CURRENT_ATTEMPTS_KEY = "currentAttempts";
   public static final String JOB_EXECUTION_FUTURE = "jobExecutionFuture";
   public static final String FLOW_START_TIME_KEY = "flowStartTime";
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
index 28687771f7..131186b3d1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
@@ -79,6 +79,8 @@ public class MysqlDagStateStoreWithDagNodesTest {
     Dag<JobExecutionPlan> originalDag2 = DagTestUtils.buildDag("random_2", 
456L);
     DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1);
     DagManager.DagId dagId2 = DagManagerUtils.generateDagId(originalDag2);
+    originalDag1.getStartNodes().get(0).getValue().setCurrentGeneration(2);
+    originalDag1.getStartNodes().get(0).getValue().setCurrentAttempts(3);
     this.dagStateStore.writeCheckpoint(originalDag1);
     this.dagStateStore.writeCheckpoint(originalDag2);
 
@@ -97,6 +99,8 @@ public class MysqlDagStateStoreWithDagNodesTest {
     Dag.DagNode<JobExecutionPlan> parent = 
dagDeserialized.getStartNodes().get(0);
     Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
     
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+    
Assert.assertEquals(dagDeserialized.getStartNodes().get(0).getValue().getCurrentGeneration(),
 2);
+    
Assert.assertEquals(dagDeserialized.getStartNodes().get(0).getValue().getCurrentAttempts(),
 3);
 
     for (int i = 0; i < 2; i++) {
       JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 9673381b26..d2438bd64c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -49,6 +48,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
@@ -187,7 +187,6 @@ public class ReevaluateDagProcTest {
   @Test
   public void testCurrentJobToRun() throws Exception {
     String flowName = "fn3";
-    DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName, 
flowExecutionId);
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         2, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))

Reply via email to