This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 59df057 [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job
beyond max attempt number (#3439)
59df057 is described below
commit 59df057cf6424362a9f6d9a6954dbc925e7944af
Author: Zihan Li <[email protected]>
AuthorDate: Fri Dec 17 11:46:30 2021 -0800
[GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max
attempt number (#3439)
* [GOBBLIN-1585]GaaS (DagManager) keep retrying a failed job beyond max
attempt number
* address comments
* adding current attempts in job config and cluster events
* add generation into job status
* address comments
* change comments
* address comments
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../gobblin/cluster/GobblinHelixJobLauncher.java | 9 +++
.../apache/gobblin/metrics/event/TimingEvent.java | 2 +
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 9 +++
.../gobblin/service/monitoring/JobStatus.java | 1 +
.../service/monitoring/JobStatusRetriever.java | 3 +-
.../service/modules/orchestration/DagManager.java | 1 +
.../modules/orchestration/DagManagerUtils.java | 18 +++++-
.../modules/orchestration/TimingEventUtils.java | 1 +
.../service/modules/spec/JobExecutionPlan.java | 1 +
.../service/monitoring/KafkaJobStatusMonitor.java | 29 +++++++---
.../service/monitoring/JobStatusRetrieverTest.java | 64 ++++++++++++++++++++--
12 files changed, 124 insertions(+), 16 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0ec72c3..36bc680 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -156,6 +156,8 @@ public class ConfigurationKeys {
public static final String JOB_GROUP_KEY = "job.group";
public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
+ public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
+ public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
public static final String JOB_SCHEDULE_KEY = "job.schedule";
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 80b7423..b7315e7 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -608,6 +608,15 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobExecutionId)));
}
+ if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+ "false"));
+ }
+
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 65f6e84..50fb856 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -89,6 +89,8 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String PROCESSED_COUNT_FIELD = "processedCount";
public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
+ //This state should always move forward, more details can be found in
method {@link KafkaJobStatusMonitor.addJobStatusToStateStore}
+ public static final String CURRENT_GENERATION_FIELD = "currentGeneration";
public static final String SHOULD_RETRY_FIELD = "shouldRetry";
}
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 9756544..7f99f93 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -388,6 +388,15 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+ if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+ "false"));
+ }
+
// use job execution id if flow execution id is not present
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobExecutionId)));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 6917e92..8dd9113 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -55,6 +55,7 @@ public class JobStatus {
private final String highWatermark;
private final int maxAttempts;
private final int currentAttempts;
+ private final int currentGeneration;
private final boolean shouldRetry;
private final Supplier<List<Issue>> issues;
private final int progressPercentage;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 4bb9ece..26fc76f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -126,6 +126,7 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
long processedCount =
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD,
"0"));
int maxAttempts =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
"1"));
int currentAttempts =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
"1"));
+ int currentGeneration =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
"1"));
boolean shouldRetry =
Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
int progressPercentage =
jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime =
jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);
@@ -146,7 +147,7 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
return
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
-
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
+
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
issues(jobIssues).build();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index bfc1f7a..7986dce 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -547,6 +547,7 @@ public class DagManager extends AbstractIdleService {
node.getValue().setExecutionStatus(PENDING_RESUME);
// reset currentAttempts because we do not want to count previous
execution's attempts in deciding whether to retry a job
node.getValue().setCurrentAttempts(0);
+ DagManagerUtils.incrementJobGeneration(node);
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 565c021..5afa128 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,6 +16,8 @@
*/
package org.apache.gobblin.service.modules.orchestration;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.ConfigFactory;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -148,7 +150,12 @@ public class DagManagerUtils {
}
public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
- return dagNode.getValue().getJobSpec();
+ JobSpec jobSpec = dagNode.getValue().getJobSpec();
+ Map<String, Integer> configWithCurrentAttempts =
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS,
dagNode.getValue().getCurrentAttempts(),
+ ConfigurationKeys.JOB_CURRENT_GENERATION,
dagNode.getValue().getCurrentGeneration());
+ //Return new spec with new config to avoid change the reference to dagNode
+ return new JobSpec(jobSpec.getUri(), jobSpec.getVersion(),
jobSpec.getDescription(),
ConfigFactory.parseMap(configWithCurrentAttempts).withFallback(jobSpec.getConfig()),
+ jobSpec.getConfigAsProperties(), jobSpec.getTemplateURI(),
jobSpec.getJobTemplate(), jobSpec.getMetadata());
}
static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) {
@@ -237,6 +244,15 @@ public class DagManagerUtils {
}
/**
+ * Increment the value of {@link JobExecutionPlan#currentGeneration}
+ * This method is not thread safe, we achieve correctness by making sure
+ * one dag will only be handled in the same DagManagerThread
+ */
+ static void incrementJobGeneration(DagNode<JobExecutionPlan> dagNode) {
+
dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration()
+ 1);
+ }
+
+ /**
* Flow start time is the same as the flow execution id which is the
timestamp flow request was received, unless it
* is a resumed flow, in which case it is {@link
JobExecutionPlan#getFlowStartTime()}
* @param dagNode dag node in context
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 1f65833..01b6ff4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -62,6 +62,7 @@ class TimingEventUtils {
jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
specExecutor.getClass().getCanonicalName());
jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
Integer.toString(jobExecutionPlan.getMaxAttempts()));
jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
Integer.toString(jobExecutionPlan.getCurrentAttempts()));
+ jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
Integer.toString(jobExecutionPlan.getCurrentGeneration()));
jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
Boolean.toString(false));
return jobMetadata;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 0cc4be3..d4467ea 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -65,6 +65,7 @@ public class JobExecutionPlan {
private final SpecExecutor specExecutor;
private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
+ private int currentGeneration = 1;
private int currentAttempts = 0;
private Optional<Future> jobFuture = Optional.absent();
private long flowStartTime = 0L;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9c4255e..262849c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -253,14 +253,25 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
- String previousStatus = states.get(states.size() -
1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-
- // PENDING_RESUME is allowed to override, because it happens when a flow
is being resumed from previously being failed
- if (previousStatus != null && currentStatus != null &&
!currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
- log.warn(String.format("Received status %s when status is already %s
for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, previousStatus, flowGroup, flowName,
flowExecutionId, jobGroup, jobName));
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ // This is to make the change backward compatible as we may not have
this info in cluster events
+ // If we does not have those info, we treat the event as coming from the
same attempts as previous one
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
+ (previousGeneration > currentGeneration
+ || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
+ || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
+ log.warn(String.format("Received status [generation.attempts] = %s
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts, previousStatus,
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId,
jobGroup, jobName));
jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
} else {
jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
@@ -275,7 +286,9 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
- if
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
&& currentAttempts < maxAttempts) {
+ // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
+ if
((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+ ||
state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name()))
&& currentAttempts < maxAttempts) {
state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
state.setProp(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.PENDING_RETRY.name());
state.removeProp(TimingEvent.JOB_END_TIME);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 55d9315..dc17d04 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -60,19 +60,21 @@ public abstract class JobStatusRetrieverTest {
abstract void setUp() throws Exception;
protected void addJobStatusToStateStore(long flowExecutionId, String
jobName, String status) throws IOException {
- addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId,
jobName, status, 0, 0);
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId,
jobName, status, 0, 0, new Properties());
}
protected void addFlowIdJobStatusToStateStore(String flowGroup, String
flowName, long flowExecutionId, String jobName, String status) throws
IOException {
- addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId,
jobName, status, 0, 0);
+ addFlowIdJobStatusToStateStore(flowGroup, flowName, flowExecutionId,
jobName, status, 0, 0, new Properties());
}
protected void addJobStatusToStateStore(long flowExecutionId, String
jobName, String status, long startTime, long endTime) throws IOException {
- addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId,
jobName, status, startTime, endTime);
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId,
jobName, status, startTime, endTime, new Properties());
+ }
+ protected void addJobStatusToStateStore(long flowExecutionId, String
jobName, String status, long startTime, long endTime, Properties properties)
throws IOException {
+ addFlowIdJobStatusToStateStore(FLOW_GROUP, FLOW_NAME, flowExecutionId,
jobName, status, startTime, endTime, properties);
}
- protected void addFlowIdJobStatusToStateStore(String flowGroup, String
flowName, long flowExecutionId, String jobName, String status, long startTime,
long endTime) throws IOException {
- Properties properties = new Properties();
+ protected void addFlowIdJobStatusToStateStore(String flowGroup, String
flowName, long flowExecutionId, String jobName, String status, long startTime,
long endTime, Properties properties) throws IOException {
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
String.valueOf(flowExecutionId));
@@ -98,6 +100,57 @@ public abstract class JobStatusRetrieverTest {
KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore());
}
+ static Properties createAttemptsProperties(int currGen, int currAttempts,
boolean shouldRetry) {
+ Properties properties = new Properties();
+
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
String.valueOf(currGen));
+
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
String.valueOf(currAttempts));
+ properties.setProperty(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
String.valueOf(shouldRetry));
+ return properties;
+ }
+ @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+ public void testOutOfOrderJobTimingEventsForRetryingJob() throws IOException
{
+ long flowExecutionId = 1240L;
+ Properties properties = createAttemptsProperties(1, 0, false);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.FAILED.name(), 0, 0, properties);
+ Iterator<JobStatus>
+ jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);
+ JobStatus jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.PENDING_RETRY.name());
+ Assert.assertEquals(jobStatus.isShouldRetry(), true);
+ properties = createAttemptsProperties(1, 1, false);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME, properties);
+ jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.RUNNING.name());
+ Assert.assertEquals(jobStatus.isShouldRetry(), false);
+ Assert.assertEquals(jobStatus.getCurrentAttempts(), 1);
+ Properties properties_new = createAttemptsProperties(2, 0, false);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.PENDING_RESUME.name(), JOB_START_TIME, JOB_START_TIME,
properties_new);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties);
+ jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.PENDING_RESUME.name());
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name(), JOB_END_TIME, JOB_END_TIME, properties_new);
+ jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.COMPLETE.name());
+ }
+
@Test
public void testGetJobStatusesForFlowExecution() throws IOException {
long flowExecutionId = 1234L;
@@ -180,7 +233,6 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(jobStatus.getEndTime(), JOB_END_TIME);
Assert.assertEquals(jobStatus.getOrchestratedTime(),
JOB_ORCHESTRATED_TIME);
}
-
@Test (dependsOnMethods = "testJobTiming")
public void testGetJobStatusesForFlowExecution1() {
long flowExecutionId = 1234L;