This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1258cc643 [GOBBLIN-2084] change flow execution id to long everywhere
it is possible (#3967)
1258cc643 is described below
commit 1258cc643de79772a2bb8bb2f9c411e59641b22e
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 11 19:25:12 2024 -0700
[GOBBLIN-2084] change flow execution id to long everywhere it is possible
(#3967)
* change flow execution id to long everywhere it is possible
---
.../runtime/DagActionStoreChangeMonitorTest.java | 26 ++++++---------------
.../runtime/KafkaAvroJobStatusMonitorTest.java | 4 ++--
.../service/StreamingKafkaSpecExecutorTest.java | 6 ++---
.../org/apache/gobblin/runtime/api/FlowSpec.java | 27 ++++++++++++----------
.../spec_executorInstance/LocalFsSpecProducer.java | 10 +++++---
.../apache/gobblin/runtime/api/FlowSpecTest.java | 15 +++++++-----
.../orchestration/DagActionReminderScheduler.java | 2 +-
.../modules/orchestration/DagActionStore.java | 20 ++++++++--------
.../orchestration/DagManagementStateStore.java | 8 +++----
.../service/modules/orchestration/DagManager.java | 4 ++--
.../modules/orchestration/DagManagerUtils.java | 8 ++-----
.../MostlyMySqlDagManagementStateStore.java | 6 ++---
.../modules/orchestration/MysqlDagActionStore.java | 20 ++++++++--------
.../modules/orchestration/Orchestrator.java | 2 +-
.../modules/orchestration/proc/DagProcUtils.java | 4 ++--
.../orchestration/proc/ReevaluateDagProc.java | 2 +-
...lowExecutionResourceHandlerWithWarmStandby.java | 4 ++--
.../monitoring/DagActionStoreChangeMonitor.java | 8 +++----
.../service/monitoring/KafkaJobStatusMonitor.java | 15 +++++-------
.../monitoring/LocalFsJobStatusRetriever.java | 2 +-
.../DagActionReminderSchedulerTest.java | 8 ++++---
.../DagManagementTaskStreamImplTest.java | 2 +-
.../modules/orchestration/DagManagerFlowTest.java | 17 +++++++-------
.../orchestration/DagProcessingEngineTest.java | 6 ++---
.../orchestration/FlowLaunchHandlerTest.java | 2 +-
.../orchestration/MysqlDagActionStoreTest.java | 6 ++---
.../MysqlMultiActiveLeaseArbiterTest.java | 4 ++--
.../proc/EnforceDeadlineDagProcsTest.java | 4 ++--
.../orchestration/proc/KillDagProcTest.java | 8 +++----
.../orchestration/proc/LaunchDagProcTest.java | 11 +++++----
.../orchestration/proc/ReevaluateDagProcTest.java | 15 ++++++------
.../orchestration/proc/ResumeDagProcTest.java | 2 +-
32 files changed, 134 insertions(+), 144 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 9031f0031..2627f55a5 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -32,7 +32,6 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
@@ -71,7 +70,7 @@ public class DagActionStoreChangeMonitorTest {
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
- private final String FLOW_EXECUTION_ID = "123";
+ private final long FLOW_EXECUTION_ID = 123L;
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;
@@ -136,7 +135,7 @@ public class DagActionStoreChangeMonitorTest {
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws
SpecNotFoundException {
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
- wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "",
null);
+ wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "",
FLOW_EXECUTION_ID, null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
@@ -227,17 +226,6 @@ public class DagActionStoreChangeMonitorTest {
@Test (dependsOnMethods = "testProcessMessageWithDelete")
public void testStartupSequenceHandlesFailures() throws Exception {
- Config config = ConfigBuilder.create()
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
- .build();
- String flowGroup = "testFlowGroup";
- String flowName = "testFlowName";
- String jobName = "testJobName";
- String flowExecutionId = "12345677";
-
DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
Config monitorConfig =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
@@ -263,12 +251,12 @@ public class DagActionStoreChangeMonitorTest {
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent
createDagActionStoreChangeEvent(OperationType operationType,
- String flowGroup, String flowName, String flowExecutionId,
DagActionValue dagAction) {
+ String flowGroup, String flowName, long flowExecutionId, DagActionValue
dagAction) {
String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter),
System.currentTimeMillis(), operationType);
txidCounter++;
- return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, flowExecutionId,
+ return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, String.valueOf(flowExecutionId),
DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
}
@@ -276,15 +264,15 @@ public class DagActionStoreChangeMonitorTest {
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow
identifiers
*/
- private String getKeyForFlow(String flowGroup, String flowName, String
flowExecutionId) {
+ private String getKeyForFlow(String flowGroup, String flowName, long
flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}
/**
* Util to create wrapper around DagActionStoreChangeEvent
*/
- private Kafka09ConsumerClient.Kafka09ConsumerRecord
wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup,
String flowName,
- String flowExecutionId, DagActionValue dagAction) {
+ private Kafka09ConsumerClient.Kafka09ConsumerRecord
wrapDagActionStoreChangeEvent(OperationType operationType,
+ String flowGroup, String flowName, long flowExecutionId, DagActionValue
dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 337e6e33e..b1496e5d4 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -95,7 +95,7 @@ public class KafkaAvroJobStatusMonitorTest {
private String flowName = "myFlowName";
private String jobGroup = "myJobGroup";
private String jobName = "myJobName";
- private String flowExecutionId = "1234";
+ private long flowExecutionId = 1234L;
private String jobExecutionId = "1111";
private String message = "https://myServer:8143/1234/1111";
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
@@ -707,7 +707,7 @@ public class KafkaAvroJobStatusMonitorTest {
Map<String, String> metadata = Maps.newHashMap();
metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
this.flowGroup);
metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
this.flowName);
- metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
String.valueOf(this.flowExecutionId));
metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
this.jobGroup);
metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
this.jobExecutionId);
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index bf7cf071b..d106c6e96 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -65,7 +65,7 @@ public class StreamingKafkaSpecExecutorTest extends
KafkaTestBase {
private static final String _TEST_DIR_PATH =
"/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
String flowSpecUriString = "/flowgroup/flowname/spec";
- Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
+ Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, 12345L);
String specUriString = "/foo/bar/spec";
Spec spec = initJobSpec(specUriString);
@@ -253,9 +253,9 @@ public class StreamingKafkaSpecExecutorTest extends
KafkaTestBase {
.build();
}
- private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String
flowExecutionId) {
+ private static JobSpec initJobSpecWithFlowExecutionId(String specUri, long
flowExecutionId) {
Properties properties = new Properties();
- properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId);
+ properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
String.valueOf(flowExecutionId));
return JobSpec.builder(specUri)
.withConfig(ConfigUtils.propertiesToConfig(properties))
.withVersion("1")
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index a74b9fd0e..560e16ba5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -17,6 +17,17 @@
package org.apache.gobblin.runtime.api;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -27,20 +38,13 @@ import com.linkedin.data.template.StringMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowConfig;
@@ -146,14 +150,13 @@ public class FlowSpec implements Configurable, Spec {
* @param key
* @param value
*/
- public synchronized void addProperty(String key, String value) {
+ public synchronized void addProperty(String key, Long value) {
this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
/* Make sure configAsProperties has been initialized. If it's just
initialized, setting the property will be a
redundant operation. However, if it already existed we need to update/add
the key-value pair.
*/
this.getConfigAsProperties();
- this.configAsProperties.setProperty(key, value);
-
+ this.configAsProperties.setProperty(key, value.toString());
}
public void addCompilationError(String src, String dst, String errorMessage,
int numberOfHops) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
index 74b534f22..3621361a9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecProducer.java
@@ -16,7 +16,7 @@
*/
package org.apache.gobblin.runtime.spec_executorInstance;
-import com.typesafe.config.Config;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -24,7 +24,11 @@ import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
+
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -70,7 +74,7 @@ public class LocalFsSpecProducer implements
SpecProducer<Spec> {
private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
if (spec instanceof JobSpec) {
// format the JobSpec to have file of <flowGroup>_<flowName>.job
- String flowExecutionId = ((JobSpec)
spec).getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ long flowExecutionId = ((JobSpec)
spec).getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobFileName = getJobFileName(spec.getUri(), flowExecutionId);
try (
FileOutputStream fStream = new FileOutputStream(this.specProducerPath
+ File.separatorChar + jobFileName);
@@ -117,7 +121,7 @@ public class LocalFsSpecProducer implements
SpecProducer<Spec> {
throw new UnsupportedOperationException();
}
- public static String getJobFileName(URI specUri, String flowExecutionId) {
+ public static String getJobFileName(URI specUri, long flowExecutionId) {
String[] uriTokens = specUri.getPath().split("/");
return String.join("_", uriTokens) + "_" + flowExecutionId + ".job";
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
index 539f42d53..3581abc18 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
@@ -17,15 +17,18 @@
package org.apache.gobblin.runtime.api;
-import com.typesafe.config.Config;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.FlowId;
+
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowId;
+
public class FlowSpecTest {
@@ -38,7 +41,7 @@ public class FlowSpecTest {
public void testAddProperty() throws URISyntaxException {
String flowGroup = "myGroup";
String flowName = "myName";
- String flowExecutionId = "1234";
+ long flowExecutionId = 1234L;
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
@@ -52,13 +55,13 @@ public class FlowSpecTest {
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId);
Properties updatedProperties = flowSpec.getConfigAsProperties();
-
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
flowExecutionId);
+
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
String.valueOf(flowExecutionId));
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY),
flowName);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
"true");
Config updatedConfig = flowSpec.getConfig();
-
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
flowExecutionId);
+
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
String.valueOf(flowExecutionId));
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY),
flowName);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
"true");
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 588db58d6..add618616 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -92,7 +92,7 @@ public class DagActionReminderScheduler {
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
- String flowExecutionId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ long flowExecutionId =
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index cdaf56c8a..3e0735bd3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -46,16 +46,16 @@ public interface DagActionStore {
class DagAction {
final String flowGroup;
final String flowName;
- final String flowExecutionId;
+ final long flowExecutionId;
final String jobName;
final DagActionType dagActionType;
final boolean isReminder;
- public DagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType) {
+ public DagAction(String flowGroup, String flowName, long flowExecutionId,
String jobName, DagActionType dagActionType) {
this(flowGroup, flowName, flowExecutionId, jobName, dagActionType,
false);
}
- public static DagAction forFlow(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) {
+ public static DagAction forFlow(String flowGroup, String flowName, long
flowExecutionId, DagActionType dagActionType) {
return new DagAction(flowGroup, flowName, flowExecutionId,
NO_JOB_NAME_DEFAULT, dagActionType);
}
@@ -67,16 +67,14 @@ public interface DagActionStore {
* Replace flow execution id with agreed upon event time to easily track
the flow
*/
public DagAction updateFlowExecutionId(long eventTimeMillis) {
- return new DagAction(this.getFlowGroup(), this.getFlowName(),
- String.valueOf(eventTimeMillis), this.getJobName(),
this.getDagActionType());
+ return new DagAction(this.getFlowGroup(), this.getFlowName(),
eventTimeMillis, this.getJobName(), this.getDagActionType());
}
/**
* Creates and returns a {@link DagNodeId} for this DagAction.
*/
public DagNodeId getDagNodeId() {
- return new DagNodeId(this.flowGroup, this.flowName,
- Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
+ return new DagNodeId(this.flowGroup, this.flowName,
this.flowExecutionId, this.flowGroup, this.jobName);
}
/**
@@ -97,7 +95,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- boolean exists(String flowGroup, String flowName, String flowExecutionId,
String jobName, DagActionType dagActionType) throws IOException, SQLException;
+ boolean exists(String flowGroup, String flowName, long flowExecutionId,
String jobName, DagActionType dagActionType) throws IOException, SQLException;
/**
* Check if an action exists in dagAction store by flow group, flow name,
and flow execution id, it assumes jobName is
@@ -108,7 +106,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- boolean exists(String flowGroup, String flowName, String flowExecutionId,
DagActionType dagActionType) throws IOException, SQLException;
+ boolean exists(String flowGroup, String flowName, long flowExecutionId,
DagActionType dagActionType) throws IOException, SQLException;
/** Persist the {@link DagAction} in {@link DagActionStore} for durability */
default void addDagAction(DagAction dagAction) throws IOException {
@@ -129,7 +127,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException;
+ void addJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException;
/**
* Persist the dag action in {@link DagActionStore} for durability. This
method assumes an empty jobName.
@@ -139,7 +137,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- default void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) throws IOException {
+ default void addFlowDagAction(String flowGroup, String flowName, long
flowExecutionId, DagActionType dagActionType) throws IOException {
addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId,
dagActionType));
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 6a6140d80..7cfe0ff94 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -206,7 +206,7 @@ public interface DagManagementStateStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- boolean existsJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ boolean existsJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
/**
@@ -218,7 +218,7 @@ public interface DagManagementStateStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- boolean existsFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ boolean existsFlowDagAction(String flowGroup, String flowName, long
flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
/** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore}
for durability */
@@ -240,7 +240,7 @@ public interface DagManagementStateStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ void addJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException;
/**
@@ -251,7 +251,7 @@ public interface DagManagementStateStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- default void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ default void addFlowDagAction(String flowGroup, String flowName, long
flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException {
addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName,
flowExecutionId, dagActionType));
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5c03b35f6..ee80cbb65 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -173,9 +173,9 @@ public class DagManager extends AbstractIdleService {
public static class DagId {
String flowGroup;
String flowName;
- String flowExecutionId;
+ long flowExecutionId;
- public DagId(String flowGroup, String flowName, String flowExecutionId) {
+ public DagId(String flowGroup, String flowName, long flowExecutionId) {
this.flowGroup = flowGroup;
this.flowName = flowName;
this.flowExecutionId = flowExecutionId;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 92d81c7be..d4b5233a9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -76,7 +76,7 @@ public class DagManagerUtils {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- String flowExecutionId =
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
}
@@ -117,7 +117,7 @@ public class DagManagerUtils {
private static DagManager.DagId generateDagId(Config jobConfig) {
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- String flowExecutionId =
jobConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
}
@@ -127,10 +127,6 @@ public class DagManagerUtils {
}
public static DagManager.DagId generateDagId(String flowGroup, String
flowName, long flowExecutionId) {
- return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
- }
-
- public static DagManager.DagId generateDagId(String flowGroup, String
flowName, String flowExecutionId) {
return new DagManager.DagId(flowGroup, flowName, flowExecutionId);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index da50c5055..862c73f96 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -270,19 +270,19 @@ public class MostlyMySqlDagManagementStateStore
implements DagManagementStateSto
}
@Override
- public boolean existsJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ public boolean existsJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException,
SQLException {
return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
}
@Override
- public boolean existsFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ public boolean existsFlowDagAction(String flowGroup, String flowName, long
flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException,
SQLException {
return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
dagActionType);
}
@Override
- public void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ public void addJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index ee1fc5173..12d786966 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -97,12 +97,12 @@ public class MysqlDagActionStore implements DagActionStore {
}
@Override
- public boolean exists(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException, SQLException {
+ public boolean exists(String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException, SQLException {
return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
int i = 0;
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
- existStatement.setString(++i, flowExecutionId);
+ existStatement.setString(++i, String.valueOf(flowExecutionId));
existStatement.setString(++i, jobName);
existStatement.setString(++i, dagActionType.toString());
ResultSet rs = null;
@@ -122,19 +122,19 @@ public class MysqlDagActionStore implements
DagActionStore {
}
@Override
- public boolean exists(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) throws IOException, SQLException {
+ public boolean exists(String flowGroup, String flowName, long
flowExecutionId, DagActionType dagActionType) throws IOException, SQLException {
return exists(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT,
dagActionType);
}
@Override
- public void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType)
+ public void addJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionType dagActionType)
throws IOException {
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
try {
int i = 0;
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
- insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, String.valueOf(flowExecutionId));
insertStatement.setString(++i, jobName);
insertStatement.setString(++i, dagActionType.toString());
return insertStatement.executeUpdate();
@@ -151,7 +151,7 @@ public class MysqlDagActionStore implements DagActionStore {
int i = 0;
deleteStatement.setString(++i, dagAction.getFlowGroup());
deleteStatement.setString(++i, dagAction.getFlowName());
- deleteStatement.setString(++i, dagAction.getFlowExecutionId());
+ deleteStatement.setString(++i,
String.valueOf(dagAction.getFlowExecutionId()));
deleteStatement.setString(++i, dagAction.getJobName());
deleteStatement.setString(++i, dagAction.getDagActionType().toString());
int result = deleteStatement.executeUpdate();
@@ -163,17 +163,17 @@ public class MysqlDagActionStore implements
DagActionStore {
}
// TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
- private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, String jobName, DagActionType dagActionType,
ExponentialBackoff exponentialBackoff)
+ private DagAction getDagActionWithRetry(String flowGroup, String flowName,
long flowExecutionId, String jobName, DagActionType dagActionType,
ExponentialBackoff exponentialBackoff)
throws IOException, SQLException {
return
dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT,
tableName), getStatement -> {
int i = 0;
getStatement.setString(++i, flowGroup);
getStatement.setString(++i, flowName);
- getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, String.valueOf(flowExecutionId));
getStatement.setString(++i, dagActionType.toString());
try (ResultSet rs = getStatement.executeQuery()) {
if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), rs.getString(4), DagActionType.valueOf(rs.getString(5)));
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getLong(3), rs.getString(4), DagActionType.valueOf(rs.getString(5)));
} else if (exponentialBackoff.awaitNextRetryIfAvailable()) {
return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
jobName, dagActionType, exponentialBackoff);
} else {
@@ -194,7 +194,7 @@ public class MysqlDagActionStore implements DagActionStore {
HashSet<DagAction> result = new HashSet<>();
try (ResultSet rs = getAllStatement.executeQuery()) {
while (rs.next()) {
- result.add(new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), rs.getString(4), DagActionType.valueOf(rs.getString(5))));
+ result.add(new DagAction(rs.getString(1), rs.getString(2),
rs.getLong(3), rs.getString(4), DagActionType.valueOf(rs.getString(5))));
}
return result;
} catch (SQLException e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index d82d4f9a7..25caaf224 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -219,7 +219,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(
flowGroup,
flowName,
- String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec)),
+ FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH
);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index c90907d23..b52f361dc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -74,7 +74,7 @@ public class DagProcUtils {
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
dagManagementStateStore.addJobDagAction(jobExecutionPlan.getFlowGroup(),
jobExecutionPlan.getFlowName(),
- String.valueOf(jobExecutionPlan.getFlowExecutionId()),
jobExecutionPlan.getJobName(), DagActionStore.DagActionType.REEVALUATE);
+ jobExecutionPlan.getFlowExecutionId(),
jobExecutionPlan.getJobName(), DagActionStore.DagActionType.REEVALUATE);
}
}
}
@@ -194,7 +194,7 @@ public class DagProcUtils {
private static void
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(),
dagNode.getValue().getFlowName(),
- String.valueOf(dagNode.getValue().getFlowExecutionId()),
dagNode.getValue().getJobName(),
+ dagNode.getValue().getFlowExecutionId(),
dagNode.getValue().getJobName(),
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 4359a6d88..242290656 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -180,7 +180,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
private void
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore) {
DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
- getDagNodeId().getFlowName(),
String.valueOf(getDagNodeId().getFlowExecutionId()),
+ getDagNodeId().getFlowName(), getDagNodeId().getFlowExecutionId(),
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
log.info("Deleting reminder trigger and dag action {}",
enforceFlowFinishDeadlineDagAction);
// todo - add metrics
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 1b5e21083..4fb0fd5fb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -71,12 +71,12 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
protected void addDagAction(String flowGroup, String flowName, Long
flowExecutionId, DagActionStore.DagActionType actionType) {
try {
// If an existing resume request is still pending then do not accept
this request
- if (this.dagManagementStateStore.existsFlowDagAction(flowGroup,
flowName, flowExecutionId.toString(), actionType)) {
+ if (this.dagManagementStateStore.existsFlowDagAction(flowGroup,
flowName, flowExecutionId, actionType)) {
this.throwErrorResponse("There is already a pending " + actionType + "
action for this flow. Please wait to resubmit and wait "
+ "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
- this.dagManagementStateStore.addFlowDagAction(flowGroup, flowName,
flowExecutionId.toString(), actionType);
+ this.dagManagementStateStore.addFlowDagAction(flowGroup, flowName,
flowExecutionId, actionType);
} catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add %s action for flow %s %s %s to dag
action store due to:", actionType, flowGroup,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 7f566a56b..5706ad9cc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -199,7 +199,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String operation =
value.getChangeEventIdentifier().getOperationType().name();
String flowGroup = value.getFlowGroup();
String flowName = value.getFlowName();
- String flowExecutionId = value.getFlowExecutionId();
+ long flowExecutionId = Long.parseLong(value.getFlowExecutionId());
String jobName = value.getJobName();
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
@@ -259,12 +259,10 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction
change ({}) received for flow: {}",
dagAction.getDagActionType(), dagAction);
if
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.RESUME)) {
- dagManager.handleResumeFlowRequest(dagAction.getFlowGroup(),
dagAction.getFlowName(),
- Long.parseLong(dagAction.getFlowExecutionId()));
+ dagManager.handleResumeFlowRequest(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
this.resumesInvoked.mark();
} else if
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.KILL)) {
- dagManager.handleKillFlowRequest(dagAction.getFlowGroup(),
dagAction.getFlowName(),
- Long.parseLong(dagAction.getFlowExecutionId()));
+ dagManager.handleKillFlowRequest(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
this.killsInvoked.mark();
} else if
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.LAUNCH)) {
// If multi-active scheduler is NOT turned on we should not receive
these type of events
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 6e60392ef..7caec9a4c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -218,7 +218,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ long flowExecutionId =
jobStatus.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String storeName = jobStatusStoreName(flowGroup, flowName);
@@ -260,9 +260,10 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
- private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore, String flowGroup, String flowName, String
flowExecutionId, String jobName) {
+ private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore, String flowGroup,
+ String flowName, long flowExecutionId, String jobName) {
DagActionStore.DagAction enforceStartDeadlineDagAction = new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), jobName,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+ flowExecutionId, jobName,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
log.info("Deleting reminder trigger and dag action {}",
enforceStartDeadlineDagAction);
// todo - add metrics
@@ -293,7 +294,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ long flowExecutionId =
jobStatus.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String storeName = jobStatusStoreName(flowGroup, flowName);
@@ -404,12 +405,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
return new org.apache.gobblin.configuration.State(mergedState);
}
- public static String jobStatusTableName(String flowExecutionId, String
jobGroup, String jobName) {
- return
Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId,
jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
- }
-
public static String jobStatusTableName(long flowExecutionId, String
jobGroup, String jobName) {
- return jobStatusTableName(String.valueOf(flowExecutionId), jobGroup,
jobName);
+ return
Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId,
jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
}
public static String jobStatusStoreName(String flowGroup, String flowName) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 887ee5827..8a9346571 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -61,7 +61,7 @@ public class LocalFsJobStatusRetriever extends
JobStatusRetriever {
// Local FS has no monitor to update job state yet, for now check if
standalone is completed with job, and mark as done
// Otherwise the job is pending
try {
- String fileName = LocalFsSpecProducer.getJobFileName(new
URI(File.separatorChar + flowGroup + File.separatorChar + flowName),
String.valueOf(flowExecutionId)) + suffix;
+ String fileName = LocalFsSpecProducer.getJobFileName(new
URI(File.separatorChar + flowGroup + File.separatorChar + flowName),
flowExecutionId) + suffix;
return new File(this.specProducerPath + File.separatorChar +
fileName).exists();
} catch (URISyntaxException e) {
log.error("URISyntaxException occurred when retrieving job status for
flow: {},{}", flowGroup, flowName, e);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index cb2ac540b..9b634f9fe 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -29,16 +29,18 @@ import org.quartz.spi.OperableTrigger;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Joiner;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
public class DagActionReminderSchedulerTest {
String flowGroup = "fg";
String flowName = "fn";
- String flowExecutionId = "123";
+ long flowExecutionId = 123L;
String jobName = "jn";
- String expectedKey = String.join(".", flowGroup, flowName, flowExecutionId,
jobName,
- String.valueOf(DagActionStore.DagActionType.LAUNCH));
+ String expectedKey = Joiner.on(".").join(flowGroup, flowName,
flowExecutionId, jobName,
+ DagActionStore.DagActionType.LAUNCH.name());
DagActionStore.DagAction launchDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
DagActionStore.DagActionType.LAUNCH);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 8d4ff795a..205f02dbc 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -104,7 +104,7 @@ public class DagManagementTaskStreamImplTest {
statuses that should cause the next() method to continue polling for tasks
before finally providing the
LeaseObtainedStatus to the taskStream to break its loop and return a
newly created dagTask
*/
- DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg",
"fn", "12345", "jn", DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg",
"fn", 12345L, "jn", DagActionStore.DagActionType.LAUNCH);
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 88a572d73..342b379f4 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -71,8 +71,8 @@ public class DagManagerFlowTest {
private static final String TABLE = "dag_action_store";
private static final String flowGroup = "testFlowGroup";
private static final String flowName = "testFlowName";
- private static final String flowExecutionId = "12345677";
- private static final String flowExecutionId_2 = "12345678";
+ private static final long flowExecutionId = 12345677L;
+ private static final long flowExecutionId_2 = 12345678L;
private ITestMetastoreDatabase testDb;
private DagActionStore dagActionStore;
@@ -136,9 +136,9 @@ public class DagManagerFlowTest {
// mock add spec
// for very first dag to be added, add dag action to store and check its
deleted by the addDag call
- dagManager.getDagActionStore().get().addFlowDagAction("group0", "flow0",
Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH);
+ dagManager.getDagActionStore().get().addFlowDagAction("group0", "flow0",
flowExecutionId1, DagActionStore.DagActionType.LAUNCH);
dagManager.addDag(dag1, true, true);
- Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0",
"flow0", Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH));
+ Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0",
"flow0", flowExecutionId1, DagActionStore.DagActionType.LAUNCH));
dagManager.addDag(dag2, true, true);
dagManager.addDag(dag3, true, true);
@@ -345,15 +345,14 @@ public class DagManagerFlowTest {
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
// Add kill action to action store and call kill
- dagActionStore.addFlowDagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.KILL);
+ dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionType.KILL);
dagManager.handleKillFlowRequest(flowGroup, flowName, flowExecutionId);
// Check that the kill dag action is removed
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input -> {
try {
- return !dagActionStore.exists(flowGroup, flowName,
String.valueOf(flowExecutionId),
- DagActionStore.DagActionType.KILL);
+ return !dagActionStore.exists(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionType.KILL);
} catch (IOException | SQLException e) {
throw new RuntimeException(e);
}
@@ -362,13 +361,13 @@ public class DagManagerFlowTest {
// Add resume action to action store and call resume
- dagActionStore.addFlowDagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
+ dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionType.RESUME);
dagManager.handleResumeFlowRequest(flowGroup, flowName, flowExecutionId);
// Check that the resume dag action is removed
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(input
-> {
try {
- return !dagActionStore.exists(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
+ return !dagActionStore.exists(flowGroup, flowName,
flowExecutionId, DagActionStore.DagActionType.RESUME);
} catch (IOException | SQLException e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 92345122b..b3a924454 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -129,9 +129,9 @@ public class DagProcessingEngineTest {
throw new RuntimeException("Simulating an exception to stop the
thread!");
}
if (i % FAILING_DAGS_FREQUENCY == 0 ) {
- return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, "1234" + i, "jn-" + i, DagActionStore.DagActionType.LAUNCH), true);
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), true);
} else {
- return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, "1234" + i, "jn-" + i, DagActionStore.DagActionType.LAUNCH), false);
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), false);
}
}
}
@@ -160,7 +160,7 @@ public class DagProcessingEngineTest {
@Override
public DagManager.DagId getDagId() {
- return new DagManager.DagId("fg", "fn", "12345");
+ return new DagManager.DagId("fg", "fn", 12345L);
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index b4feeed84..197ceda76 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -35,7 +35,7 @@ public class FlowLaunchHandlerTest {
String cronExpressionSuffix =
truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
- String.valueOf(flowExecutionId), "jobName",
DagActionStore.DagActionType.LAUNCH);
+ flowExecutionId, "jobName", DagActionStore.DagActionType.LAUNCH);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit,
minimumLingerDurationMillis);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 0c39abd16..faf6cd28b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -42,9 +42,9 @@ public class MysqlDagActionStoreTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final String jobName_2 = "testJobName2";
- private static final String flowExecutionId = "12345677";
- private static final String flowExecutionId_2 = "12345678";
- private static final String flowExecutionId_3 = "12345679";
+ private static final long flowExecutionId = 12345677L;
+ private static final long flowExecutionId_2 = 12345678L;
+ private static final long flowExecutionId_3 = 12345679L;
private ITestMetastoreDatabase testDb;
private MysqlDagActionStore mysqlDagActionStore;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 607d8f3b0..bcb208fa7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -51,7 +51,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowGroup2 = "testFlowGroup2";
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
- private static final String flowExecutionId = "12345677";
+ private static final long flowExecutionId = 12345677L;
// Dag actions with the same flow info but different flow action types are
considered unique
private static DagActionStore.DagAction launchDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
@@ -110,7 +110,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
- new DagActionStore.DagAction(flowGroup, flowName,
String.valueOf(firstObtainedStatus.getEventTimeMillis()),
+ new DagActionStore.DagAction(flowGroup, flowName,
firstObtainedStatus.getEventTimeMillis(),
jobName, DagActionStore.DagActionType.LAUNCH)));
// Verify that different DagAction types for the same flow can have leases
at the same time
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 52d80469f..0fb261515 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -94,7 +94,7 @@ public class EnforceDeadlineDagProcsTest {
dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
- new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
@@ -130,7 +130,7 @@ public class EnforceDeadlineDagProcsTest {
dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
is in running state
EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
- new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index b7c86eebe..0976f6c59 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -93,7 +93,7 @@ public class KillDagProcTest {
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow1",
- String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
+ flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.LAUNCH),
null, this.dagManagementStateStore), flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
@@ -106,7 +106,7 @@ public class KillDagProcTest {
}).collect(Collectors.toList());
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow1",
- String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.KILL),
+ flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.KILL),
null, this.dagManagementStateStore));
killDagProc.process(this.dagManagementStateStore);
@@ -136,7 +136,7 @@ public class KillDagProcTest {
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow2",
- String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH),
+ flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.LAUNCH),
null, this.dagManagementStateStore), flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
@@ -149,7 +149,7 @@ public class KillDagProcTest {
}).collect(Collectors.toList());
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow2",
- String.valueOf(flowExecutionId), "job2",
DagActionStore.DagActionType.KILL),
+ flowExecutionId, "job2", DagActionStore.DagActionType.KILL),
null, this.dagManagementStateStore));
killDagProc.process(this.dagManagementStateStore);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index b4c5e0109..1e7580e7f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -59,6 +59,7 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.util.ConfigUtils;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -89,8 +90,8 @@ public class LaunchDagProcTest {
public void launchDag() throws IOException, InterruptedException,
URISyntaxException, ExecutionException {
String flowGroup = "fg";
String flowName = "fn";
- String flowExecutionId = "12345";
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
Long.parseLong(flowExecutionId),
+ long flowExecutionId = 12345L;
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5",
ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
@@ -117,14 +118,14 @@ public class LaunchDagProcTest {
Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
- .addFlowDagAction(any(), any(), any(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
+ .addFlowDagAction(any(), any(), anyLong(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}
@Test
public void launchDagWithMultipleParallelJobs() throws IOException,
InterruptedException, URISyntaxException {
String flowGroup = "fg";
String flowName = "fn";
- String flowExecutionId = "12345";
+ long flowExecutionId = 12345L;
Dag<JobExecutionPlan> dag =
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5",
ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
@@ -150,7 +151,7 @@ public class LaunchDagProcTest {
// / \
// D5 D6
- public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(String id, String flowExecutionId,
+ public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(String id, long flowExecutionId,
String flowFailureOption, String proxyUser, Config additionalConfig)
throws URISyntaxException {
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index c5ac11f69..057788173 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -57,6 +57,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -120,7 +121,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
long addSpecCount = specProducers.stream()
@@ -175,7 +176,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
// no new job to launch for this one job flow
@@ -214,7 +215,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null,
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
dagManagementStateStore));
reEvaluateDagProc.process(dagManagementStateStore);
@@ -228,7 +229,7 @@ public class ReevaluateDagProcTest {
// no job's state is deleted because that happens when the job finishes
triggered the reevaluate dag proc
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagNodeState(any(), any());
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), any(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagActionReminderScheduler,
Mockito.never()).unscheduleReminderJob(any());
}
@@ -236,7 +237,7 @@ public class ReevaluateDagProcTest {
@Test
public void testMultipleNextJobToRun() throws Exception {
String flowName = "fn4";
- Dag<JobExecutionPlan> dag =
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1",
String.valueOf(flowExecutionId),
+ Dag<JobExecutionPlan> dag =
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1",
flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
@@ -260,7 +261,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId), "job3",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+ flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE),
null, dagManagementStateStore));
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
// process 4th job
reEvaluateDagProc.process(dagManagementStateStore);
@@ -268,7 +269,7 @@ public class ReevaluateDagProcTest {
int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th
job passes, i.e. 5th and 6th job
// parallel jobs are launched through reevaluate dag action
Mockito.verify(dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
- .addJobDagAction(eq(flowGroup), eq(flowName),
eq(String.valueOf(flowExecutionId)), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
+ .addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId),
any(), eq(DagActionStore.DagActionType.REEVALUATE));
// when there are parallel jobs to launch, they are not directly sent to
spec producers, instead reevaluate dag action is created
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 08fff6fe1..c68bde24e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -96,7 +96,7 @@ public class ResumeDagProcTest {
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- String.valueOf(flowExecutionId),
MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME),
+ flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),
null, this.dagManagementStateStore));
resumeDagProc.process(this.dagManagementStateStore);