This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9f50a25 [GOBBLIN-974] Avoid updating job/flow status if messages
arrive out of order
9f50a25 is described below
commit 9f50a2563cc257039da44018663b6b9e119fb499
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Nov 21 15:29:34 2019 -0800
[GOBBLIN-974] Avoid updating job/flow status if messages arrive out of order
Closes #2820 from jack-moseley/status-order
---
.../service/monitoring/KafkaJobStatusMonitor.java | 33 +++++++++++++++-------
.../monitoring/KafkaAvroJobStatusMonitorTest.java | 13 +++++++++
2 files changed, 36 insertions(+), 10 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 8c8ea2a..c3b1427 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -74,6 +75,10 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private static final Config DEFAULTS =
ConfigFactory.parseMap(ImmutableMap.of(
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+ private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES =
ImmutableList
+ .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
+ ExecutionStatus.COMPLETE, ExecutionStatus.FAILED,
ExecutionStatus.CANCELLED);
+
public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
@@ -146,7 +151,20 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
- jobStatus = mergedProperties(storeName, tableName, jobStatus, stateStore);
+ List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ String previousStatus = states.get(states.size() -
1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+
+ if (previousStatus != null && currentStatus != null &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
+ log.warn(String.format("Received status %s when status is already %s
for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, previousStatus, flowGroup, flowName,
flowExecutionId, jobGroup, jobName));
+ return;
+ }
+ }
+
+ jobStatus = mergedProperties(jobStatus, states);
modifyStateIfRetryRequired(jobStatus);
@@ -163,17 +181,12 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
- private static org.apache.gobblin.configuration.State mergedProperties(
- String storeName, String tableName,
org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) {
+ private static org.apache.gobblin.configuration.State
mergedProperties(org.apache.gobblin.configuration.State jobStatus,
+ List<org.apache.gobblin.configuration.State> states) {
Properties mergedProperties = new Properties();
- try {
- List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- mergedProperties.putAll(states.get(states.size() - 1).getProperties());
- }
- } catch (Exception e) {
- log.warn("Could not get previous state for {} {}", storeName, tableName,
e);
+ if (states.size() > 0) {
+ mergedProperties.putAll(states.get(states.size() - 1).getProperties());
}
mergedProperties.putAll(jobStatus.getProperties());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index 14668ee..202146d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -106,6 +106,10 @@ public class KafkaAvroJobStatusMonitorTest {
context.submitEvent(event5);
kafkaReporter.report();
+ GobblinTrackingEvent event6 = createJobStartEvent();
+ context.submitEvent(event6);
+ kafkaReporter.report();
+
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
@@ -159,6 +163,15 @@ public class KafkaAvroJobStatusMonitorTest {
messageAndMetadata = iterator.next();
Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+
+ // Check that state didn't get set to running since it was already complete
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
}
private GobblinTrackingEvent createFlowCompiledEvent() {