This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 442b5b24cc serialize/deserialize curretnAttemps and currentGeneration
in dag node (#4014)
442b5b24cc is described below
commit 442b5b24cc91ee16b1f03a6056f8fa7d6c3b923d
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Aug 2 16:33:47 2024 -0700
serialize/deserialize curretnAttemps and currentGeneration in dag node
(#4014)
---
.../gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java | 2 +-
.../service/modules/spec/JobExecutionPlanListDeserializer.java | 2 ++
.../gobblin/service/modules/spec/JobExecutionPlanListSerializer.java | 2 ++
.../apache/gobblin/service/modules/spec/SerializationConstants.java | 2 ++
.../modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java | 4 ++++
.../service/modules/orchestration/proc/ReevaluateDagProcTest.java | 3 +--
6 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
index f95f65fab1..4dc0cb1f7f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java
@@ -44,7 +44,7 @@ import org.apache.gobblin.util.CompletedFuture;
public class InMemorySpecExecutor extends AbstractSpecExecutor {
// Communication mechanism components.
// Not specifying final for further extension based on this implementation.
- private SpecProducer<Spec> inMemorySpecProducer;
+ private final SpecProducer<Spec> inMemorySpecProducer;
public InMemorySpecExecutor(Config config){
this(config, Optional.absent());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index c51a9f3782..e5fffafc24 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -110,6 +110,8 @@ public class JobExecutionPlanListDeserializer implements
JsonDeserializer<List<J
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec,
specExecutor);
jobExecutionPlan.setExecutionStatus(executionStatus);
+
jobExecutionPlan.setCurrentGeneration(serializedJobExecutionPlan.get(SerializationConstants.CURRENT_GENERATION_KEY).getAsInt());
+
jobExecutionPlan.setCurrentAttempts(serializedJobExecutionPlan.get(SerializationConstants.CURRENT_ATTEMPTS_KEY).getAsInt());
JsonElement flowStartTime =
serializedJobExecutionPlan.get(SerializationConstants.FLOW_START_TIME_KEY);
if (flowStartTime != null) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
index 3d2fe36a56..d3cc778ede 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -82,6 +82,8 @@ public class JobExecutionPlanListSerializer implements
JsonSerializer<List<JobEx
String executionStatus = jobExecutionPlan.getExecutionStatus().name();
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY,
executionStatus);
+
jobExecutionPlanJson.addProperty(SerializationConstants.CURRENT_GENERATION_KEY,
jobExecutionPlan.getCurrentGeneration());
+
jobExecutionPlanJson.addProperty(SerializationConstants.CURRENT_ATTEMPTS_KEY,
jobExecutionPlan.getCurrentAttempts());
jobExecutionPlanJson.addProperty(SerializationConstants.FLOW_START_TIME_KEY,
jobExecutionPlan.getFlowStartTime());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index 7cccf9f723..82c67f617e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -31,6 +31,8 @@ public class SerializationConstants {
public static final String SPEC_EXECUTOR_URI_KEY = "uri";
public static final String EXECUTION_STATUS_KEY = "executionStatus";
+ public static final String CURRENT_GENERATION_KEY = "currentGeneration";
+ public static final String CURRENT_ATTEMPTS_KEY = "currentAttempts";
public static final String JOB_EXECUTION_FUTURE = "jobExecutionFuture";
public static final String FLOW_START_TIME_KEY = "flowStartTime";
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
index 28687771f7..131186b3d1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
@@ -79,6 +79,8 @@ public class MysqlDagStateStoreWithDagNodesTest {
Dag<JobExecutionPlan> originalDag2 = DagTestUtils.buildDag("random_2",
456L);
DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1);
DagManager.DagId dagId2 = DagManagerUtils.generateDagId(originalDag2);
+ originalDag1.getStartNodes().get(0).getValue().setCurrentGeneration(2);
+ originalDag1.getStartNodes().get(0).getValue().setCurrentAttempts(3);
this.dagStateStore.writeCheckpoint(originalDag1);
this.dagStateStore.writeCheckpoint(originalDag2);
@@ -97,6 +99,8 @@ public class MysqlDagStateStoreWithDagNodesTest {
Dag.DagNode<JobExecutionPlan> parent =
dagDeserialized.getStartNodes().get(0);
Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
Assert.assertEquals(dagDeserialized.getStartNodes().get(0).getValue().getCurrentGeneration(),
2);
+
Assert.assertEquals(dagDeserialized.getStartNodes().get(0).getValue().getCurrentAttempts(),
3);
for (int i = 0; i < 2; i++) {
JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 9673381b26..d2438bd64c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -49,6 +48,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
@@ -187,7 +187,6 @@ public class ReevaluateDagProcTest {
@Test
public void testCurrentJobToRun() throws Exception {
String flowName = "fn3";
- DagManager.DagId dagId = new DagManager.DagId(flowGroup, flowName,
flowExecutionId);
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
2, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))