This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1258cc643 [GOBBLIN-2084] change flow execution id to long everywhere 
it is possible (#3967)
1258cc643 is described below

commit 1258cc643de79772a2bb8bb2f9c411e59641b22e
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 11 19:25:12 2024 -0700

    [GOBBLIN-2084] change flow execution id to long everywhere it is possible 
(#3967)
    
    * change flow execution id to long everywhere it is possible
---
 .../runtime/DagActionStoreChangeMonitorTest.java   | 26 ++++++---------------
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |  4 ++--
 .../service/StreamingKafkaSpecExecutorTest.java    |  6 ++---
 .../org/apache/gobblin/runtime/api/FlowSpec.java   | 27 ++++++++++++----------
 .../spec_executorInstance/LocalFsSpecProducer.java | 10 +++++---
 .../apache/gobblin/runtime/api/FlowSpecTest.java   | 15 +++++++-----
 .../orchestration/DagActionReminderScheduler.java  |  2 +-
 .../modules/orchestration/DagActionStore.java      | 20 ++++++++--------
 .../orchestration/DagManagementStateStore.java     |  8 +++----
 .../service/modules/orchestration/DagManager.java  |  4 ++--
 .../modules/orchestration/DagManagerUtils.java     |  8 ++-----
 .../MostlyMySqlDagManagementStateStore.java        |  6 ++---
 .../modules/orchestration/MysqlDagActionStore.java | 20 ++++++++--------
 .../modules/orchestration/Orchestrator.java        |  2 +-
 .../modules/orchestration/proc/DagProcUtils.java   |  4 ++--
 .../orchestration/proc/ReevaluateDagProc.java      |  2 +-
 ...lowExecutionResourceHandlerWithWarmStandby.java |  4 ++--
 .../monitoring/DagActionStoreChangeMonitor.java    |  8 +++----
 .../service/monitoring/KafkaJobStatusMonitor.java  | 15 +++++-------
 .../monitoring/LocalFsJobStatusRetriever.java      |  2 +-
 .../DagActionReminderSchedulerTest.java            |  8 ++++---
 .../DagManagementTaskStreamImplTest.java           |  2 +-
 .../modules/orchestration/DagManagerFlowTest.java  | 17 +++++++-------
 .../orchestration/DagProcessingEngineTest.java     |  6 ++---
 .../orchestration/FlowLaunchHandlerTest.java       |  2 +-
 .../orchestration/MysqlDagActionStoreTest.java     |  6 ++---
 .../MysqlMultiActiveLeaseArbiterTest.java          |  4 ++--
 .../proc/EnforceDeadlineDagProcsTest.java          |  4 ++--
 .../orchestration/proc/KillDagProcTest.java        |  8 +++----
 .../orchestration/proc/LaunchDagProcTest.java      | 11 +++++----
 .../orchestration/proc/ReevaluateDagProcTest.java  | 15 ++++++------
 .../orchestration/proc/ResumeDagProcTest.java      |  2 +-
 32 files changed, 134 insertions(+), 144 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 9031f0031..2627f55a5 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -32,7 +32,6 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
@@ -71,7 +70,7 @@ public class DagActionStoreChangeMonitorTest {
 
   private final String FLOW_GROUP = "flowGroup";
   private final String FLOW_NAME = "flowName";
-  private final String FLOW_EXECUTION_ID = "123";
+  private final long FLOW_EXECUTION_ID = 123L;
   private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
   private int txidCounter = 0;
 
@@ -136,7 +135,7 @@ public class DagActionStoreChangeMonitorTest {
   @Test
   public void testProcessMessageWithHeartbeatAndNullDagAction() throws 
SpecNotFoundException {
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
-        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", 
null);
+        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", 
FLOW_EXECUTION_ID, null);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
     verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
     verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
@@ -227,17 +226,6 @@ public class DagActionStoreChangeMonitorTest {
 
   @Test (dependsOnMethods = "testProcessMessageWithDelete")
   public void testStartupSequenceHandlesFailures() throws Exception {
-    Config config = ConfigBuilder.create()
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
-        .build();
-    String flowGroup = "testFlowGroup";
-    String flowName = "testFlowName";
-    String jobName = "testJobName";
-    String flowExecutionId = "12345677";
-
     DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
 
     Config monitorConfig = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
@@ -263,12 +251,12 @@ public class DagActionStoreChangeMonitorTest {
    * Util to create a general DagActionStoreChange type event
    */
   private DagActionStoreChangeEvent 
createDagActionStoreChangeEvent(OperationType operationType,
-      String flowGroup, String flowName, String flowExecutionId, 
DagActionValue dagAction) {
+      String flowGroup, String flowName, long flowExecutionId, DagActionValue 
dagAction) {
     String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
     GenericStoreChangeEvent genericStoreChangeEvent =
         new GenericStoreChangeEvent(key, String.valueOf(txidCounter), 
System.currentTimeMillis(), operationType);
     txidCounter++;
-    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, flowExecutionId,
+    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, String.valueOf(flowExecutionId),
         DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
   }
 
@@ -276,15 +264,15 @@ public class DagActionStoreChangeMonitorTest {
    * Form a key for events using the flow identifiers
    * @return a key formed by adding an '_' delimiter between the flow 
identifiers
    */
-  private String getKeyForFlow(String flowGroup, String flowName, String 
flowExecutionId) {
+  private String getKeyForFlow(String flowGroup, String flowName, long 
flowExecutionId) {
     return flowGroup + "_" + flowName + "_" + flowExecutionId;
   }
 
   /**
    * Util to create wrapper around DagActionStoreChangeEvent
    */
-  private Kafka09ConsumerClient.Kafka09ConsumerRecord 
wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, 
String flowName,
-      String flowExecutionId, DagActionValue dagAction) {
+  private Kafka09ConsumerClient.Kafka09ConsumerRecord 
wrapDagActionStoreChangeEvent(OperationType operationType,
+      String flowGroup, String flowName, long flowExecutionId, DagActionValue 
dagAction) {
     DagActionStoreChangeEvent eventToProcess = null;
     try {
       eventToProcess =
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 337e6e33e..b1496e5d4 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -95,7 +95,7 @@ public class KafkaAvroJobStatusMonitorTest {
   private String flowName = "myFlowName";
   private String jobGroup = "myJobGroup";
   private String jobName = "myJobName";
-  private String flowExecutionId = "1234";
+  private long flowExecutionId = 1234L;
   private String jobExecutionId = "1111";
   private String message = "https://myServer:8143/1234/1111";;
   private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
@@ -707,7 +707,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Map<String, String> metadata = Maps.newHashMap();
     metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
     metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
String.valueOf(this.flowExecutionId));
     metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
     metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);
     metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, 
this.jobExecutionId);
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index bf7cf071b..d106c6e96 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -65,7 +65,7 @@ public class StreamingKafkaSpecExecutorTest extends 
KafkaTestBase {
   private static final String _TEST_DIR_PATH = 
"/tmp/StreamingKafkaSpecExecutorTest";
   private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
   String flowSpecUriString = "/flowgroup/flowname/spec";
-  Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
+  Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, 12345L);
   String specUriString = "/foo/bar/spec";
   Spec spec = initJobSpec(specUriString);
 
@@ -253,9 +253,9 @@ public class StreamingKafkaSpecExecutorTest extends 
KafkaTestBase {
         .build();
   }
 
-  private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String 
flowExecutionId) {
+  private static JobSpec initJobSpecWithFlowExecutionId(String specUri, long 
flowExecutionId) {
     Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId);
+    properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.valueOf(flowExecutionId));
     return JobSpec.builder(specUri)
         .withConfig(ConfigUtils.propertiesToConfig(properties))
         .withVersion("1")
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index a74b9fd0e..560e16ba5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -17,6 +17,17 @@
 
 package org.apache.gobblin.runtime.api;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -27,20 +38,13 @@ import com.linkedin.data.template.StringMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.FlowConfig;
@@ -146,14 +150,13 @@ public class FlowSpec implements Configurable, Spec {
    * @param key
    * @param value
    */
-  public synchronized void addProperty(String key, String value) {
+  public synchronized void addProperty(String key, Long value) {
     this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
     /* Make sure configAsProperties has been initialized. If it's just 
initialized, setting the property will be a
     redundant operation. However, if it already existed we need to update/add 
the key-value pair.
      */
     this.getConfigAsProperties();
-    this.configAsProperties.setProperty(key, value);
-
+    this.configAsProperties.setProperty(key, value.toString());
   }
 
   public void addCompilationError(String src, String dst, String errorMessage, 
int numberOfHops) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
index 74b534f22..3621361a9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
@@ -16,7 +16,7 @@
  */
 
 package org.apache.gobblin.runtime.spec_executorInstance;
-import com.typesafe.config.Config;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -24,7 +24,11 @@ import java.net.URI;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
+
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -70,7 +74,7 @@ public class LocalFsSpecProducer implements 
SpecProducer<Spec> {
   private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
     if (spec instanceof JobSpec) {
       // format the JobSpec to have file of <flowGroup>_<flowName>.job
-      String flowExecutionId = ((JobSpec) 
spec).getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      long flowExecutionId = ((JobSpec) 
spec).getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       String jobFileName = getJobFileName(spec.getUri(), flowExecutionId);
       try (
         FileOutputStream fStream = new FileOutputStream(this.specProducerPath 
+ File.separatorChar + jobFileName);
@@ -117,7 +121,7 @@ public class LocalFsSpecProducer implements 
SpecProducer<Spec> {
     throw new UnsupportedOperationException();
   }
 
-  public static String getJobFileName(URI specUri, String flowExecutionId) {
+  public static String getJobFileName(URI specUri, long flowExecutionId) {
     String[] uriTokens = specUri.getPath().split("/");
     return String.join("_", uriTokens) + "_" + flowExecutionId + ".job";
   }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
index 539f42d53..3581abc18 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
@@ -17,15 +17,18 @@
 
 package org.apache.gobblin.runtime.api;
 
-import com.typesafe.config.Config;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Properties;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.FlowId;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowId;
+
 
 public class FlowSpecTest {
 
@@ -38,7 +41,7 @@ public class FlowSpecTest {
   public void testAddProperty() throws URISyntaxException {
     String flowGroup = "myGroup";
     String flowName = "myName";
-    String flowExecutionId = "1234";
+    long flowExecutionId = 1234L;
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
     URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
 
@@ -52,13 +55,13 @@ public class FlowSpecTest {
     flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId);
 
     Properties updatedProperties = flowSpec.getConfigAsProperties();
-    
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 flowExecutionId);
+    
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 String.valueOf(flowExecutionId));
     
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),
 flowGroup);
     
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY),
 flowName);
     
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
 "true");
 
     Config updatedConfig = flowSpec.getConfig();
-    
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 flowExecutionId);
+    
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 String.valueOf(flowExecutionId));
     
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), 
flowGroup);
     
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), 
flowName);
     
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
 "true");
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 588db58d6..add618616 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -92,7 +92,7 @@ public class DagActionReminderScheduler {
       String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
       String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowExecutionId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      long flowExecutionId = 
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
 
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index cdaf56c8a..3e0735bd3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -46,16 +46,16 @@ public interface DagActionStore {
   class DagAction {
     final String flowGroup;
     final String flowName;
-    final String flowExecutionId;
+    final long flowExecutionId;
     final String jobName;
     final DagActionType dagActionType;
     final boolean isReminder;
 
-    public DagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType) {
+    public DagAction(String flowGroup, String flowName, long flowExecutionId, 
String jobName, DagActionType dagActionType) {
       this(flowGroup, flowName, flowExecutionId, jobName, dagActionType, 
false);
     }
 
-    public static DagAction forFlow(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) {
+    public static DagAction forFlow(String flowGroup, String flowName, long 
flowExecutionId, DagActionType dagActionType) {
       return new DagAction(flowGroup, flowName, flowExecutionId, 
NO_JOB_NAME_DEFAULT, dagActionType);
     }
 
@@ -67,16 +67,14 @@ public interface DagActionStore {
      *   Replace flow execution id with agreed upon event time to easily track 
the flow
      */
     public DagAction updateFlowExecutionId(long eventTimeMillis) {
-      return new DagAction(this.getFlowGroup(), this.getFlowName(),
-          String.valueOf(eventTimeMillis), this.getJobName(), 
this.getDagActionType());
+      return new DagAction(this.getFlowGroup(), this.getFlowName(), 
eventTimeMillis, this.getJobName(), this.getDagActionType());
     }
 
     /**
      * Creates and returns a {@link DagNodeId} for this DagAction.
      */
     public DagNodeId getDagNodeId() {
-      return new DagNodeId(this.flowGroup, this.flowName,
-          Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
+      return new DagNodeId(this.flowGroup, this.flowName, 
this.flowExecutionId, this.flowGroup, this.jobName);
     }
 
     /**
@@ -97,7 +95,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  boolean exists(String flowGroup, String flowName, String flowExecutionId, 
String jobName, DagActionType dagActionType) throws IOException, SQLException;
+  boolean exists(String flowGroup, String flowName, long flowExecutionId, 
String jobName, DagActionType dagActionType) throws IOException, SQLException;
 
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
and flow execution id, it assumes jobName is
@@ -108,7 +106,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  boolean exists(String flowGroup, String flowName, String flowExecutionId, 
DagActionType dagActionType) throws IOException, SQLException;
+  boolean exists(String flowGroup, String flowName, long flowExecutionId, 
DagActionType dagActionType) throws IOException, SQLException;
 
   /** Persist the {@link DagAction} in {@link DagActionStore} for durability */
   default void addDagAction(DagAction dagAction) throws IOException {
@@ -129,7 +127,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException;
+  void addJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException;
 
   /**
    * Persist the dag action in {@link DagActionStore} for durability. This 
method assumes an empty jobName.
@@ -139,7 +137,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  default void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) throws IOException {
+  default void addFlowDagAction(String flowGroup, String flowName, long 
flowExecutionId, DagActionType dagActionType) throws IOException {
     addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId, 
dagActionType));
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 6a6140d80..7cfe0ff94 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -206,7 +206,7 @@ public interface DagManagementStateStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  boolean existsJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+  boolean existsJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
 
   /**
@@ -218,7 +218,7 @@ public interface DagManagementStateStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  boolean existsFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+  boolean existsFlowDagAction(String flowGroup, String flowName, long 
flowExecutionId,
       DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
 
   /** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} 
for durability */
@@ -240,7 +240,7 @@ public interface DagManagementStateStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+  void addJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException;
 
   /**
@@ -251,7 +251,7 @@ public interface DagManagementStateStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  default void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+  default void addFlowDagAction(String flowGroup, String flowName, long 
flowExecutionId,
       DagActionStore.DagActionType dagActionType) throws IOException {
     addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName, 
flowExecutionId, dagActionType));
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5c03b35f6..ee80cbb65 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -173,9 +173,9 @@ public class DagManager extends AbstractIdleService {
   public static class DagId {
     String flowGroup;
     String flowName;
-    String flowExecutionId;
+    long flowExecutionId;
 
-    public DagId(String flowGroup, String flowName, String flowExecutionId) {
+    public DagId(String flowGroup, String flowName, long flowExecutionId) {
       this.flowGroup = flowGroup;
       this.flowName = flowName;
       this.flowExecutionId = flowExecutionId;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 92d81c7be..d4b5233a9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -76,7 +76,7 @@ public class DagManagerUtils {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName =  jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    String flowExecutionId = 
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
     String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
     return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
   }
@@ -117,7 +117,7 @@ public class DagManagerUtils {
   private static DagManager.DagId generateDagId(Config jobConfig) {
     String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    String flowExecutionId = 
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
 
     return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
   }
@@ -127,10 +127,6 @@ public class DagManagerUtils {
   }
 
   public static DagManager.DagId generateDagId(String flowGroup, String 
flowName, long flowExecutionId) {
-    return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
-  }
-
-  public static DagManager.DagId generateDagId(String flowGroup, String 
flowName, String flowExecutionId) {
     return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index da50c5055..862c73f96 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -270,19 +270,19 @@ public class MostlyMySqlDagManagementStateStore 
implements DagManagementStateSto
   }
 
   @Override
-  public boolean existsJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+  public boolean existsJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException {
     return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
   }
 
   @Override
-  public boolean existsFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+  public boolean existsFlowDagAction(String flowGroup, String flowName, long 
flowExecutionId,
       DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException {
     return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
dagActionType);
   }
 
   @Override
-  public void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+  public void addJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException {
     this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index ee1fc5173..12d786966 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -97,12 +97,12 @@ public class MysqlDagActionStore implements DagActionStore {
   }
 
   @Override
-  public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException, SQLException {
+  public boolean exists(String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException, SQLException {
     return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
       int i = 0;
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
-      existStatement.setString(++i, flowExecutionId);
+      existStatement.setString(++i, String.valueOf(flowExecutionId));
       existStatement.setString(++i, jobName);
       existStatement.setString(++i, dagActionType.toString());
       ResultSet rs = null;
@@ -122,19 +122,19 @@ public class MysqlDagActionStore implements 
DagActionStore {
   }
 
   @Override
-  public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) throws IOException, SQLException {
+  public boolean exists(String flowGroup, String flowName, long 
flowExecutionId, DagActionType dagActionType) throws IOException, SQLException {
     return exists(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, 
dagActionType);
   }
 
   @Override
-  public void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionType dagActionType)
+  public void addJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionType dagActionType)
       throws IOException {
     dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
     try {
       int i = 0;
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
-      insertStatement.setString(++i, flowExecutionId);
+      insertStatement.setString(++i, String.valueOf(flowExecutionId));
       insertStatement.setString(++i, jobName);
       insertStatement.setString(++i, dagActionType.toString());
       return insertStatement.executeUpdate();
@@ -151,7 +151,7 @@ public class MysqlDagActionStore implements DagActionStore {
       int i = 0;
       deleteStatement.setString(++i, dagAction.getFlowGroup());
       deleteStatement.setString(++i, dagAction.getFlowName());
-      deleteStatement.setString(++i, dagAction.getFlowExecutionId());
+      deleteStatement.setString(++i, 
String.valueOf(dagAction.getFlowExecutionId()));
       deleteStatement.setString(++i, dagAction.getJobName());
       deleteStatement.setString(++i, dagAction.getDagActionType().toString());
       int result = deleteStatement.executeUpdate();
@@ -163,17 +163,17 @@ public class MysqlDagActionStore implements 
DagActionStore {
   }
 
   // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
-  private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, String jobName, DagActionType dagActionType, 
ExponentialBackoff exponentialBackoff)
+  private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
long flowExecutionId, String jobName, DagActionType dagActionType, 
ExponentialBackoff exponentialBackoff)
       throws IOException, SQLException {
     return 
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, 
tableName), getStatement -> {
       int i = 0;
       getStatement.setString(++i, flowGroup);
       getStatement.setString(++i, flowName);
-      getStatement.setString(++i, flowExecutionId);
+      getStatement.setString(++i, String.valueOf(flowExecutionId));
       getStatement.setString(++i, dagActionType.toString());
       try (ResultSet rs = getStatement.executeQuery()) {
         if (rs.next()) {
-          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), rs.getString(4), DagActionType.valueOf(rs.getString(5)));
+          return new DagAction(rs.getString(1), rs.getString(2), 
rs.getLong(3), rs.getString(4), DagActionType.valueOf(rs.getString(5)));
         } else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
           return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType, exponentialBackoff);
         } else {
@@ -194,7 +194,7 @@ public class MysqlDagActionStore implements DagActionStore {
       HashSet<DagAction> result = new HashSet<>();
       try (ResultSet rs = getAllStatement.executeQuery()) {
         while (rs.next()) {
-          result.add(new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), rs.getString(4), DagActionType.valueOf(rs.getString(5))));
+          result.add(new DagAction(rs.getString(1), rs.getString(2), 
rs.getLong(3), rs.getString(4), DagActionType.valueOf(rs.getString(5))));
         }
         return result;
       } catch (SQLException e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index d82d4f9a7..25caaf224 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -219,7 +219,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(
             flowGroup,
             flowName,
-            String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec)),
+            FlowUtils.getOrCreateFlowExecutionId(flowSpec),
             DagActionStore.DagActionType.LAUNCH
         );
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index c90907d23..b52f361dc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -74,7 +74,7 @@ public class DagProcUtils {
       for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
         JobExecutionPlan jobExecutionPlan = dagNode.getValue();
         
dagManagementStateStore.addJobDagAction(jobExecutionPlan.getFlowGroup(), 
jobExecutionPlan.getFlowName(),
-            String.valueOf(jobExecutionPlan.getFlowExecutionId()), 
jobExecutionPlan.getJobName(), DagActionStore.DagActionType.REEVALUATE);
+            jobExecutionPlan.getFlowExecutionId(), 
jobExecutionPlan.getJobName(), DagActionStore.DagActionType.REEVALUATE);
       }
     }
   }
@@ -194,7 +194,7 @@ public class DagProcUtils {
   private static void 
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
       throws IOException {
     dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(), 
dagNode.getValue().getFlowName(),
-        String.valueOf(dagNode.getValue().getFlowExecutionId()), 
dagNode.getValue().getJobName(),
+        dagNode.getValue().getFlowExecutionId(), 
dagNode.getValue().getJobName(),
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 4359a6d88..242290656 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -180,7 +180,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
 
   private void 
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore) {
     DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
-        getDagNodeId().getFlowName(), 
String.valueOf(getDagNodeId().getFlowExecutionId()),
+        getDagNodeId().getFlowName(), getDagNodeId().getFlowExecutionId(),
         DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
     log.info("Deleting reminder trigger and dag action {}", 
enforceFlowFinishDeadlineDagAction);
     // todo - add metrics
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 1b5e21083..4fb0fd5fb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -71,12 +71,12 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
   protected void addDagAction(String flowGroup, String flowName, Long 
flowExecutionId, DagActionStore.DagActionType actionType) {
     try {
       // If an existing resume request is still pending then do not accept 
this request
-      if (this.dagManagementStateStore.existsFlowDagAction(flowGroup, 
flowName, flowExecutionId.toString(), actionType)) {
+      if (this.dagManagementStateStore.existsFlowDagAction(flowGroup, 
flowName, flowExecutionId, actionType)) {
         this.throwErrorResponse("There is already a pending " + actionType + " 
action for this flow. Please wait to resubmit and wait "
             + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
-      this.dagManagementStateStore.addFlowDagAction(flowGroup, flowName, 
flowExecutionId.toString(), actionType);
+      this.dagManagementStateStore.addFlowDagAction(flowGroup, flowName, 
flowExecutionId, actionType);
     } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add %s action for flow %s %s %s to dag 
action store due to:", actionType, flowGroup,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 7f566a56b..5706ad9cc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -199,7 +199,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     String operation = 
value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
-    String flowExecutionId = value.getFlowExecutionId();
+    long flowExecutionId = Long.parseLong(value.getFlowExecutionId());
     String jobName = value.getJobName();
 
     produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
@@ -259,12 +259,10 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction 
change ({}) received for flow: {}",
         dagAction.getDagActionType(), dagAction);
     if 
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.RESUME)) {
-      dagManager.handleResumeFlowRequest(dagAction.getFlowGroup(), 
dagAction.getFlowName(),
-          Long.parseLong(dagAction.getFlowExecutionId()));
+      dagManager.handleResumeFlowRequest(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
       this.resumesInvoked.mark();
     } else if 
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.KILL)) {
-      dagManager.handleKillFlowRequest(dagAction.getFlowGroup(), 
dagAction.getFlowName(),
-          Long.parseLong(dagAction.getFlowExecutionId()));
+      dagManager.handleKillFlowRequest(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
       this.killsInvoked.mark();
     } else if 
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.LAUNCH)) {
       // If multi-active scheduler is NOT turned on we should not receive 
these type of events
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 6e60392ef..7caec9a4c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -218,7 +218,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
           String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
           String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-          String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+          long flowExecutionId = 
jobStatus.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
           String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
           String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
           String storeName = jobStatusStoreName(flowGroup, flowName);
@@ -260,9 +260,10 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     }
   }
 
-  private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore, String flowGroup, String flowName, String 
flowExecutionId, String jobName) {
+  private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore, String flowGroup,
+      String flowName, long flowExecutionId, String jobName) {
     DagActionStore.DagAction enforceStartDeadlineDagAction = new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), jobName, 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+        flowExecutionId, jobName, 
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
     log.info("Deleting reminder trigger and dag action {}", 
enforceStartDeadlineDagAction);
     // todo - add metrics
 
@@ -293,7 +294,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       }
       String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
       String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-      String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+      long flowExecutionId = 
jobStatus.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
       String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
       String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
       String storeName = jobStatusStoreName(flowGroup, flowName);
@@ -404,12 +405,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     return new org.apache.gobblin.configuration.State(mergedState);
   }
 
-  public static String jobStatusTableName(String flowExecutionId, String 
jobGroup, String jobName) {
-    return 
Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId,
 jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
-  }
-
   public static String jobStatusTableName(long flowExecutionId, String 
jobGroup, String jobName) {
-    return jobStatusTableName(String.valueOf(flowExecutionId), jobGroup, 
jobName);
+    return 
Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId,
 jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
   }
 
   public static String jobStatusStoreName(String flowGroup, String flowName) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 887ee5827..8a9346571 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -61,7 +61,7 @@ public class LocalFsJobStatusRetriever extends 
JobStatusRetriever {
     // Local FS has no monitor to update job state yet, for now check if 
standalone is completed with job, and mark as done
     // Otherwise the job is pending
     try {
-      String fileName = LocalFsSpecProducer.getJobFileName(new 
URI(File.separatorChar + flowGroup + File.separatorChar + flowName), 
String.valueOf(flowExecutionId)) + suffix;
+      String fileName = LocalFsSpecProducer.getJobFileName(new 
URI(File.separatorChar + flowGroup + File.separatorChar + flowName), 
flowExecutionId) + suffix;
       return new File(this.specProducerPath + File.separatorChar + 
fileName).exists();
     } catch (URISyntaxException e) {
       log.error("URISyntaxException occurred when retrieving job status for 
flow: {},{}", flowGroup, flowName, e);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index cb2ac540b..9b634f9fe 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -29,16 +29,18 @@ import org.quartz.spi.OperableTrigger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Joiner;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 
 
 public class DagActionReminderSchedulerTest {
   String flowGroup = "fg";
   String flowName = "fn";
-  String flowExecutionId = "123";
+  long flowExecutionId = 123L;
   String jobName = "jn";
-  String expectedKey =  String.join(".", flowGroup, flowName, flowExecutionId, 
jobName,
-      String.valueOf(DagActionStore.DagActionType.LAUNCH));
+  String expectedKey =  Joiner.on(".").join(flowGroup, flowName, 
flowExecutionId, jobName,
+      DagActionStore.DagActionType.LAUNCH.name());
   DagActionStore.DagAction launchDagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
       DagActionStore.DagActionType.LAUNCH);
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 8d4ff795a..205f02dbc 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -104,7 +104,7 @@ public class DagManagementTaskStreamImplTest {
     statuses that should cause the next() method to continue polling for tasks 
before finally providing the
      LeaseObtainedStatus to the taskStream to break its loop and return a 
newly created dagTask
     */
-    DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg", 
"fn", "12345", "jn", DagActionStore.DagActionType.LAUNCH);
+    DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg", 
"fn", 12345L, "jn", DagActionStore.DagActionType.LAUNCH);
     dagManagementTaskStream.addDagAction(launchAction);
     dagManagementTaskStream.addDagAction(launchAction);
     dagManagementTaskStream.addDagAction(launchAction);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 88a572d73..342b379f4 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -71,8 +71,8 @@ public class DagManagerFlowTest {
   private static final String TABLE = "dag_action_store";
   private static final String flowGroup = "testFlowGroup";
   private static final String flowName = "testFlowName";
-  private static final String flowExecutionId = "12345677";
-  private static final String flowExecutionId_2 = "12345678";
+  private static final long flowExecutionId = 12345677L;
+  private static final long flowExecutionId_2 = 12345678L;
   private ITestMetastoreDatabase testDb;
   private DagActionStore dagActionStore;
 
@@ -136,9 +136,9 @@ public class DagManagerFlowTest {
 
     // mock add spec
     // for very first dag to be added, add dag action to store and check its 
deleted by the addDag call
-    dagManager.getDagActionStore().get().addFlowDagAction("group0", "flow0", 
Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH);
+    dagManager.getDagActionStore().get().addFlowDagAction("group0", "flow0", 
flowExecutionId1, DagActionStore.DagActionType.LAUNCH);
     dagManager.addDag(dag1, true, true);
-    Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0", 
"flow0", Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH));
+    Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0", 
"flow0", flowExecutionId1, DagActionStore.DagActionType.LAUNCH));
     dagManager.addDag(dag2, true, true);
     dagManager.addDag(dag3, true, true);
 
@@ -345,15 +345,14 @@ public class DagManagerFlowTest {
     String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
     // Add kill action to action store and call kill
-    dagActionStore.addFlowDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.KILL);
+    dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionType.KILL);
     dagManager.handleKillFlowRequest(flowGroup, flowName, flowExecutionId);
 
     // Check that the kill dag action is removed
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> {
       try {
-        return !dagActionStore.exists(flowGroup, flowName, 
String.valueOf(flowExecutionId),
-            DagActionStore.DagActionType.KILL);
+        return !dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionType.KILL);
       } catch (IOException | SQLException e) {
         throw new RuntimeException(e);
       }
@@ -362,13 +361,13 @@ public class DagManagerFlowTest {
 
 
     // Add resume action to action store and call resume
-    dagActionStore.addFlowDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
+    dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionType.RESUME);
     dagManager.handleResumeFlowRequest(flowGroup, flowName, flowExecutionId);
 
     // Check that the resume dag action is removed
     
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(input 
-> {
           try {
-            return !dagActionStore.exists(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
+            return !dagActionStore.exists(flowGroup, flowName, 
flowExecutionId, DagActionStore.DagActionType.RESUME);
           } catch (IOException | SQLException e) {
             throw new RuntimeException(e);
           }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 92345122b..b3a924454 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -129,9 +129,9 @@ public class DagProcessingEngineTest {
         throw new RuntimeException("Simulating an exception to stop the 
thread!");
       }
       if (i % FAILING_DAGS_FREQUENCY == 0 ) {
-        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, "jn-" + i, DagActionStore.DagActionType.LAUNCH), true);
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), true);
       } else {
-        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, "jn-" + i, DagActionStore.DagActionType.LAUNCH), false);
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), false);
       }
     }
   }
@@ -160,7 +160,7 @@ public class DagProcessingEngineTest {
 
     @Override
     public DagManager.DagId getDagId() {
-      return new DagManager.DagId("fg", "fn", "12345");
+      return new DagManager.DagId("fg", "fn", 12345L);
     }
 
     @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index b4feeed84..197ceda76 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -35,7 +35,7 @@ public class FlowLaunchHandlerTest {
   String cronExpressionSuffix = 
truncateFirstTwoFieldsOfCronExpression(cronExpression);
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction("flowName", "flowGroup",
-      String.valueOf(flowExecutionId), "jobName", 
DagActionStore.DagActionType.LAUNCH);
+      flowExecutionId, "jobName", DagActionStore.DagActionType.LAUNCH);
   LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
       new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit, 
minimumLingerDurationMillis);
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 0c39abd16..faf6cd28b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -42,9 +42,9 @@ public class MysqlDagActionStoreTest {
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
   private static final String jobName_2 = "testJobName2";
-  private static final String flowExecutionId = "12345677";
-  private static final String flowExecutionId_2 = "12345678";
-  private static final String flowExecutionId_3 = "12345679";
+  private static final long flowExecutionId = 12345677L;
+  private static final long flowExecutionId_2 = 12345678L;
+  private static final long flowExecutionId_3 = 12345679L;
   private ITestMetastoreDatabase testDb;
   private MysqlDagActionStore mysqlDagActionStore;
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 607d8f3b0..bcb208fa7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -51,7 +51,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String flowGroup2 = "testFlowGroup2";
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
-  private static final String flowExecutionId = "12345677";
+  private static final long flowExecutionId = 12345677L;
   // Dag actions with the same flow info but different flow action types are 
considered unique
   private static DagActionStore.DagAction launchDagAction =
       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
@@ -110,7 +110,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
         firstObtainedStatus.getLeaseAcquisitionTimestamp());
     Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
-        new DagActionStore.DagAction(flowGroup, flowName, 
String.valueOf(firstObtainedStatus.getEventTimeMillis()),
+        new DagActionStore.DagAction(flowGroup, flowName, 
firstObtainedStatus.getEventTimeMillis(),
             jobName, DagActionStore.DagActionType.LAUNCH)));
 
     // Verify that different DagAction types for the same flow can have leases 
at the same time
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 52d80469f..0fb261515 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -94,7 +94,7 @@ public class EnforceDeadlineDagProcsTest {
     dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
-        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
             "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
     enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
 
@@ -130,7 +130,7 @@ public class EnforceDeadlineDagProcsTest {
     dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
is in running state
 
     EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
-        new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+        new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
             "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
     enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index b7c86eebe..0976f6c59 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -93,7 +93,7 @@ public class KillDagProcTest {
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow1",
-        String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
+        flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.LAUNCH),
         null, this.dagManagementStateStore), flowCompilationValidationHelper);
     launchDagProc.process(this.dagManagementStateStore);
 
@@ -106,7 +106,7 @@ public class KillDagProcTest {
     }).collect(Collectors.toList());
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow1",
-       String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.KILL),
+       flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.KILL),
         null, this.dagManagementStateStore));
     killDagProc.process(this.dagManagementStateStore);
 
@@ -136,7 +136,7 @@ public class KillDagProcTest {
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow2",
-        String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
+        flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.LAUNCH),
         null, this.dagManagementStateStore), flowCompilationValidationHelper);
     launchDagProc.process(this.dagManagementStateStore);
 
@@ -149,7 +149,7 @@ public class KillDagProcTest {
     }).collect(Collectors.toList());
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow2",
-        String.valueOf(flowExecutionId), "job2", 
DagActionStore.DagActionType.KILL),
+        flowExecutionId, "job2", DagActionStore.DagActionType.KILL),
         null, this.dagManagementStateStore));
     killDagProc.process(this.dagManagementStateStore);
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index b4c5e0109..1e7580e7f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -59,6 +59,7 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -89,8 +90,8 @@ public class LaunchDagProcTest {
   public void launchDag() throws IOException, InterruptedException, 
URISyntaxException, ExecutionException {
     String flowGroup = "fg";
     String flowName = "fn";
-    String flowExecutionId = "12345";
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
Long.parseLong(flowExecutionId),
+    long flowExecutionId = 12345L;
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
         DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5", 
ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName)));
@@ -117,14 +118,14 @@ public class LaunchDagProcTest {
     Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
 
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
-        .addFlowDagAction(any(), any(), any(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
+        .addFlowDagAction(any(), any(), anyLong(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
   }
 
   @Test
   public void launchDagWithMultipleParallelJobs() throws IOException, 
InterruptedException, URISyntaxException {
     String flowGroup = "fg";
     String flowName = "fn";
-    String flowExecutionId = "12345";
+    long flowExecutionId = 12345L;
     Dag<JobExecutionPlan> dag = 
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
         DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5", 
ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
@@ -150,7 +151,7 @@ public class LaunchDagProcTest {
   //    /   \
   //  D5     D6
 
-  public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(String id, String flowExecutionId,
+  public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(String id, long flowExecutionId,
       String flowFailureOption, String proxyUser, Config additionalConfig) 
throws URISyntaxException {
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index c5ac11f69..057788173 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -57,6 +57,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -120,7 +121,7 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
     long addSpecCount = specProducers.stream()
@@ -175,7 +176,7 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
     // no new job to launch for this one job flow
@@ -214,7 +215,7 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null,
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
         dagManagementStateStore));
     reEvaluateDagProc.process(dagManagementStateStore);
 
@@ -228,7 +229,7 @@ public class ReevaluateDagProcTest {
     // no job's state is deleted because that happens when the job finishes 
triggered the reevaluate dag proc
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagNodeState(any(), any());
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), any(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagActionReminderScheduler, 
Mockito.never()).unscheduleReminderJob(any());
   }
@@ -236,7 +237,7 @@ public class ReevaluateDagProcTest {
   @Test
   public void testMultipleNextJobToRun() throws Exception {
     String flowName = "fn4";
-    Dag<JobExecutionPlan> dag = 
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1", 
String.valueOf(flowExecutionId),
+    Dag<JobExecutionPlan> dag = 
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1", 
flowExecutionId,
         DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5", 
ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
@@ -260,7 +261,7 @@ public class ReevaluateDagProcTest {
 
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), "job3", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+        flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE), 
null, dagManagementStateStore));
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     // process 4th job
     reEvaluateDagProc.process(dagManagementStateStore);
@@ -268,7 +269,7 @@ public class ReevaluateDagProcTest {
     int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th 
job passes, i.e. 5th and 6th job
     // parallel jobs are launched through reevaluate dag action
     Mockito.verify(dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
-        .addJobDagAction(eq(flowGroup), eq(flowName), 
eq(String.valueOf(flowExecutionId)), any(), 
eq(DagActionStore.DagActionType.REEVALUATE));
+        .addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId), 
any(), eq(DagActionStore.DagActionType.REEVALUATE));
 
     // when there are parallel jobs to launch, they are not directly sent to 
spec producers, instead reevaluate dag action is created
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 08fff6fe1..c68bde24e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -96,7 +96,7 @@ public class ResumeDagProcTest {
     
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        String.valueOf(flowExecutionId), 
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME),
+        flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),
         null, this.dagManagementStateStore));
     resumeDagProc.process(this.dagManagementStateStore);
 

Reply via email to