This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0469447 [GOBBLIN-998] ExecutionStatus should be reset to PENDING
before a job retries
0469447 is described below
commit 0469447f0732bf61a39d38af45e8c84715f87853
Author: Chen Guo <[email protected]>
AuthorDate: Tue Dec 10 10:58:42 2019 -0800
[GOBBLIN-998] ExecutionStatus should be reset to PENDING before a job
retries
Closes #2843 from enjoyear/GOBBLIN-998
---
.../apache/gobblin/service/ExecutionStatus.pdsc | 9 +-
...ache.gobblin.service.flowstatuses.snapshot.json | 17 +-
.../service/modules/orchestration/DagManager.java | 3 +
.../modules/orchestration/DagManagerUtils.java | 2 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 7 +-
.../modules/orchestration/DagManagerTest.java | 19 ++-
.../monitoring/KafkaAvroJobStatusMonitorTest.java | 173 +++++++++++++++++++--
7 files changed, 203 insertions(+), 27 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
index 5a9635e..3040eab 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
@@ -3,14 +3,15 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
- "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED",
"COMPLETE", "PENDING"],
+ "symbols" : ["COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED",
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
"COMPILED":"Flow compiled to jobs.",
+ "PENDING":"Flow or job is in pending state.",
+ "PENDING_RETRY":"Flow or job is pending retry.",
"ORCHESTRATED":"Job(s) orchestrated to spec executors.",
"RUNNING": "Flow or job is currently executing",
- "FAILED":"Flow or job failed",
- "CANCELLED":"Flow cancelled.",
"COMPLETE":"Flow or job completed execution",
- "PENDING":"Flow or job is in pending state."
+ "FAILED":"Flow or job failed",
+ "CANCELLED":"Flow cancelled."
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index a161726..8b60aa1 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -36,15 +36,16 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
- "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED",
"CANCELLED", "COMPLETE", "PENDING" ],
+ "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED",
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
- "COMPILED" : "Flow compiled to jobs.",
- "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
- "RUNNING" : "Flow or job is currently executing",
- "FAILED" : "Flow or job failed",
- "CANCELLED" : "Flow cancelled.",
- "COMPLETE" : "Flow or job completed execution",
- "PENDING" : "Flow or job is in pending state."
+ "COMPILED":"Flow compiled to jobs.",
+ "PENDING":"Flow or job is in pending state.",
+ "PENDING_RETRY":"Flow or job is pending retry.",
+ "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
+ "RUNNING": "Flow or job is currently executing",
+ "COMPLETE":"Flow or job completed execution",
+ "FAILED":"Flow or job failed",
+ "CANCELLED":"Flow cancelled."
}
}, {
"type" : "record",
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 048365e..c3f22cc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -523,6 +523,9 @@ public class DagManager extends AbstractIdleService {
case PENDING:
jobExecutionPlan.setExecutionStatus(PENDING);
break;
+ case PENDING_RETRY:
+ jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+ break;
default:
jobExecutionPlan.setExecutionStatus(RUNNING);
break;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 98e6fa4..6bfd7de 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -182,7 +182,7 @@ public class DagManagerUtils {
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
- if (executionStatus == ExecutionStatus.PENDING) {
+ if (executionStatus == ExecutionStatus.PENDING || executionStatus ==
ExecutionStatus.PENDING_RETRY) {
//Add a node to be executed next, only if all of its parent nodes are
COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c3b1427..be0947d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -76,8 +76,9 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES =
ImmutableList
- .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
- ExecutionStatus.COMPLETE, ExecutionStatus.FAILED,
ExecutionStatus.CANCELLED);
+ .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING,
ExecutionStatus.PENDING_RETRY,
+ ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
ExecutionStatus.COMPLETE,
+ ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
throws ReflectiveOperationException {
@@ -176,7 +177,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
int currentAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
if
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
&& currentAttempts < maxAttempts) {
state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
- state.setProp(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.RUNNING.name());
+ state.setProp(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.PENDING_RETRY.name());
state.removeProp(TimingEvent.JOB_END_TIME);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 04f48fb..c74314c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -424,7 +424,9 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
- Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.PENDING_RETRY), true);
+ Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.FAILED));
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(),
@@ -433,7 +435,9 @@ public class DagManagerTest {
thenReturn(jobStatusIterator2).
thenReturn(jobStatusIterator3).
thenReturn(jobStatusIterator4).
- thenReturn(jobStatusIterator5);
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6).
+ thenReturn(jobStatusIterator7);
// Run 4 times, first job fails every time and is retried
for (int i = 0; i < 4; i++) {
@@ -447,6 +451,17 @@ public class DagManagerTest {
Assert.assertEquals(dag.getStartNodes().get(0).getValue().getCurrentAttempts(),
i + 1);
}
+ // Got a PENDING_RETRY state
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertEquals(this.jobToDag.size(), 1);
+ Assert.assertEquals(this.dagToJobs.size(), 1);
+
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertEquals(this.jobToDag.size(), 1);
+ Assert.assertEquals(this.dagToJobs.size(), 1);
+
// Last run fails and dag is cleaned up
this._dagManagerThread.run();
Assert.assertEquals(this.dags.size(), 0);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index 202146d..eb68cf4 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -65,6 +66,8 @@ public class KafkaAvroJobStatusMonitorTest {
private String jobExecutionId = "1111";
private String message = "https://myServer:8143/1234/1111";
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
+ private MetricContext context;
+ private KafkaAvroEventKeyValueReporter.Builder<?> builder;
@BeforeClass
public void setUp() throws Exception {
@@ -79,18 +82,22 @@ public class KafkaAvroJobStatusMonitorTest {
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
this.kafkaTestHelper.getKafkaServerPort()))));
//Create an event reporter instance
- MetricContext context = MetricContext.builder("context").build();
- KafkaAvroEventKeyValueReporter.Builder<?> builder =
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+ context = MetricContext.builder("context").build();
+ builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
builder =
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
- KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic");
+ }
+
+ @Test
+ public void testProcessMessageForSuccessfulFlow() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic1");
//Submit GobblinTrackingEvents to Kafka
GobblinTrackingEvent event1 = createFlowCompiledEvent();
context.submitEvent(event1);
kafkaReporter.report();
- GobblinTrackingEvent event2 = createJobOrchestratedEvent();
+ GobblinTrackingEvent event2 = createJobOrchestratedEvent(1);
context.submitEvent(event2);
kafkaReporter.report();
@@ -115,10 +122,7 @@ public class KafkaAvroJobStatusMonitorTest {
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
- }
- @Test
- public void testProcessMessage() throws IOException,
ReflectiveOperationException {
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
.withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
KafkaJobStatusMonitor jobStatusMonitor = new
KafkaAvroJobStatusMonitor("test",config, 1);
@@ -174,6 +178,123 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
}
+ @Test
+ public void testProcessMessageForFailedFlow() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
+
+ //Submit GobblinTrackingEvents to Kafka
+ GobblinTrackingEvent event1 = createFlowCompiledEvent();
+ context.submitEvent(event1);
+ kafkaReporter.report();
+
+ //set maximum attempt to 2, and current attempt to 1
+ GobblinTrackingEvent event2 = createJobOrchestratedEvent(1);
+ context.submitEvent(event2);
+ kafkaReporter.report();
+
+ GobblinTrackingEvent event3 = createJobStartEvent();
+ context.submitEvent(event3);
+ kafkaReporter.report();
+
+ GobblinTrackingEvent event4 = createJobFailedEvent();
+ context.submitEvent(event4);
+ kafkaReporter.report();
+
+ //Mimic retrying - job orchestration
+ //set maximum attempt to 2, and current attempt to 2
+ GobblinTrackingEvent event5 = createJobOrchestratedEvent(2);
+ context.submitEvent(event5);
+ kafkaReporter.report();
+
+ //Mimic retrying - job start (current attempt = 2)
+ GobblinTrackingEvent event6 = createJobStartEvent();
+ context.submitEvent(event6);
+ kafkaReporter.report();
+
+ //Mimic retrying - job failed again (current attempt = 2)
+ GobblinTrackingEvent event7 = createJobFailedEvent();
+ context.submitEvent(event7);
+ kafkaReporter.report();
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ KafkaJobStatusMonitor jobStatusMonitor = new
KafkaAvroJobStatusMonitor("test",config, 1);
+
+ ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Because the maximum attempt is set to 2, so the state is set to Failed
after trying twice
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
+ }
+
private GobblinTrackingEvent createFlowCompiledEvent() {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
@@ -188,11 +309,19 @@ public class KafkaAvroJobStatusMonitorTest {
return event;
}
- private GobblinTrackingEvent createJobOrchestratedEvent() {
+ /**
+ * Create a Job Orchestrated Event with a configurable currentAttempt
+ * @param currentAttempt specify the number of attempts for the
JobOrchestration event
+ * @return the {@link GobblinTrackingEvent}
+ */
+ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt) {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
String name = TimingEvent.LauncherTimings.JOB_ORCHESTRATED;
Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "2");
+ metadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
String.valueOf(currentAttempt));
+ metadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
Boolean.toString(false));
metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
this.flowGroup);
metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
this.flowName);
metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
this.flowExecutionId);
@@ -243,6 +372,24 @@ public class KafkaAvroJobStatusMonitorTest {
return event;
}
+ private GobblinTrackingEvent createJobFailedEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.LauncherTimings.JOB_FAILED;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
this.jobGroup);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
this.jobExecutionId);
+ metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+ metadata.put(TimingEvent.METADATA_START_TIME, "7");
+ metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp,
namespace, name, metadata);
+ return event;
+ }
/**
* Create a dummy event to test if it is filtered out by the consumer.
*/
@@ -265,11 +412,19 @@ public class KafkaAvroJobStatusMonitorTest {
}
}
+ @AfterMethod
+ public void cleanUpStateStore() {
+ try {
+ cleanUpDir(stateStoreDir);
+ } catch(Exception e) {
+ System.err.println("Failed to clean up the state store.");
+ }
+ }
+
@AfterClass
public void tearDown() {
try {
this.kafkaTestHelper.close();
- cleanUpDir(stateStoreDir);
} catch(Exception e) {
System.err.println("Failed to close Kafka server.");
}