This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0469447  [GOBBLIN-998] ExecutionStatus should be reset to PENDING 
before a job retries
0469447 is described below

commit 0469447f0732bf61a39d38af45e8c84715f87853
Author: Chen Guo <[email protected]>
AuthorDate: Tue Dec 10 10:58:42 2019 -0800

    [GOBBLIN-998] ExecutionStatus should be reset to PENDING before a job 
retries
    
    Closes #2843 from enjoyear/GOBBLIN-998
---
 .../apache/gobblin/service/ExecutionStatus.pdsc    |   9 +-
 ...ache.gobblin.service.flowstatuses.snapshot.json |  17 +-
 .../service/modules/orchestration/DagManager.java  |   3 +
 .../modules/orchestration/DagManagerUtils.java     |   2 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |   7 +-
 .../modules/orchestration/DagManagerTest.java      |  19 ++-
 .../monitoring/KafkaAvroJobStatusMonitorTest.java  | 173 +++++++++++++++++++--
 7 files changed, 203 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
index 5a9635e..3040eab 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
@@ -3,14 +3,15 @@
   "name" : "ExecutionStatus",
   "namespace" : "org.apache.gobblin.service",
   "doc" : "Execution status for a flow or job",
-  "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED", 
"COMPLETE", "PENDING"],
+  "symbols" : ["COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", 
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
   "symbolDocs" : {
     "COMPILED":"Flow compiled to jobs.",
+    "PENDING":"Flow or job is in pending state.",
+    "PENDING_RETRY":"Flow or job is pending retry.",
     "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
     "RUNNING": "Flow or job is currently executing",
-    "FAILED":"Flow or job failed",
-    "CANCELLED":"Flow cancelled.",
     "COMPLETE":"Flow or job completed execution",
-    "PENDING":"Flow or job is in pending state."
+    "FAILED":"Flow or job failed",
+    "CANCELLED":"Flow cancelled."
   }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index a161726..8b60aa1 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -36,15 +36,16 @@
     "name" : "ExecutionStatus",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Execution status for a flow or job",
-    "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", 
"CANCELLED", "COMPLETE", "PENDING" ],
+    "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", 
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
-      "COMPILED" : "Flow compiled to jobs.",
-      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
-      "RUNNING" : "Flow or job is currently executing",
-      "FAILED" : "Flow or job failed",
-      "CANCELLED" : "Flow cancelled.",
-      "COMPLETE" : "Flow or job completed execution",
-      "PENDING" : "Flow or job is in pending state."
+      "COMPILED":"Flow compiled to jobs.",
+      "PENDING":"Flow or job is in pending state.",
+      "PENDING_RETRY":"Flow or job is pending retry.",
+      "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
+      "RUNNING": "Flow or job is currently executing",
+      "COMPLETE":"Flow or job completed execution",
+      "FAILED":"Flow or job failed",
+      "CANCELLED":"Flow cancelled."
     }
   }, {
     "type" : "record",
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 048365e..c3f22cc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -523,6 +523,9 @@ public class DagManager extends AbstractIdleService {
           case PENDING:
             jobExecutionPlan.setExecutionStatus(PENDING);
             break;
+          case PENDING_RETRY:
+            jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+            break;
           default:
             jobExecutionPlan.setExecutionStatus(RUNNING);
             break;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 98e6fa4..6bfd7de 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -182,7 +182,7 @@ public class DagManagerUtils {
       DagNode<JobExecutionPlan> node = nodesToExpand.poll();
       ExecutionStatus executionStatus = getExecutionStatus(node);
       boolean addFlag = true;
-      if (executionStatus == ExecutionStatus.PENDING) {
+      if (executionStatus == ExecutionStatus.PENDING || executionStatus == 
ExecutionStatus.PENDING_RETRY) {
         //Add a node to be executed next, only if all of its parent nodes are 
COMPLETE.
         List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
         for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c3b1427..be0947d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -76,8 +76,9 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
 
   private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = 
ImmutableList
-      .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, 
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
-          ExecutionStatus.COMPLETE, ExecutionStatus.FAILED, 
ExecutionStatus.CANCELLED);
+      .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, 
ExecutionStatus.PENDING_RETRY,
+          ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, 
ExecutionStatus.COMPLETE,
+          ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
 
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
       throws ReflectiveOperationException {
@@ -176,7 +177,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     int currentAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
     if 
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
 && currentAttempts < maxAttempts) {
       state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
-      state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.RUNNING.name());
+      state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.PENDING_RETRY.name());
       state.removeProp(TimingEvent.JOB_END_TIME);
     }
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 04f48fb..c74314c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -424,7 +424,9 @@ public class DagManagerTest {
     Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
     Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
     Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
-    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.PENDING_RETRY), true);
+    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.FAILED));
 
 
     
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
@@ -433,7 +435,9 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator2).
         thenReturn(jobStatusIterator3).
         thenReturn(jobStatusIterator4).
-        thenReturn(jobStatusIterator5);
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6).
+        thenReturn(jobStatusIterator7);
 
     // Run 4 times, first job fails every time and is retried
     for (int i = 0; i < 4; i++) {
@@ -447,6 +451,17 @@ public class DagManagerTest {
       
Assert.assertEquals(dag.getStartNodes().get(0).getValue().getCurrentAttempts(), 
i + 1);
     }
 
+    // Got a PENDING_RETRY state
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertEquals(this.dagToJobs.size(), 1);
+
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertEquals(this.dagToJobs.size(), 1);
+
     // Last run fails and dag is cleaned up
     this._dagManagerThread.run();
     Assert.assertEquals(this.dags.size(), 0);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index 202146d..eb68cf4 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -65,6 +66,8 @@ public class KafkaAvroJobStatusMonitorTest {
   private String jobExecutionId = "1111";
   private String message = "https://myServer:8143/1234/1111";;
   private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
+  private MetricContext context;
+  private KafkaAvroEventKeyValueReporter.Builder<?> builder;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -79,18 +82,22 @@ public class KafkaAvroJobStatusMonitorTest {
             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
this.kafkaTestHelper.getKafkaServerPort()))));
 
     //Create an event reporter instance
-    MetricContext context = MetricContext.builder("context").build();
-    KafkaAvroEventKeyValueReporter.Builder<?> builder = 
KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+    context = MetricContext.builder("context").build();
+    builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
     builder = 
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
-    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic");
+  }
+
+  @Test
+  public void testProcessMessageForSuccessfulFlow() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic1");
 
     //Submit GobblinTrackingEvents to Kafka
     GobblinTrackingEvent event1 = createFlowCompiledEvent();
     context.submitEvent(event1);
     kafkaReporter.report();
 
-    GobblinTrackingEvent event2 = createJobOrchestratedEvent();
+    GobblinTrackingEvent event2 = createJobOrchestratedEvent(1);
     context.submitEvent(event2);
     kafkaReporter.report();
 
@@ -115,10 +122,7 @@ public class KafkaAvroJobStatusMonitorTest {
     } catch(InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-  }
 
-  @Test
-  public void testProcessMessage() throws IOException, 
ReflectiveOperationException {
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
         .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
     KafkaJobStatusMonitor jobStatusMonitor =  new 
KafkaAvroJobStatusMonitor("test",config, 1);
@@ -174,6 +178,123 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
   }
 
+  @Test
+  public void testProcessMessageForFailedFlow() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
+
+    //Submit GobblinTrackingEvents to Kafka
+    GobblinTrackingEvent event1 = createFlowCompiledEvent();
+    context.submitEvent(event1);
+    kafkaReporter.report();
+
+    //set maximum attempt to 2, and current attempt to 1
+    GobblinTrackingEvent event2 = createJobOrchestratedEvent(1);
+    context.submitEvent(event2);
+    kafkaReporter.report();
+
+    GobblinTrackingEvent event3 = createJobStartEvent();
+    context.submitEvent(event3);
+    kafkaReporter.report();
+
+    GobblinTrackingEvent event4 = createJobFailedEvent();
+    context.submitEvent(event4);
+    kafkaReporter.report();
+
+    //Mimic retrying - job orchestration
+    //set maximum attempt to 2, and current attempt to 2
+    GobblinTrackingEvent event5 = createJobOrchestratedEvent(2);
+    context.submitEvent(event5);
+    kafkaReporter.report();
+
+    //Mimic retrying - job start (current attempt = 2)
+    GobblinTrackingEvent event6 = createJobStartEvent();
+    context.submitEvent(event6);
+    kafkaReporter.report();
+
+    //Mimic retrying - job failed again (current attempt = 2)
+    GobblinTrackingEvent event7 = createJobFailedEvent();
+    context.submitEvent(event7);
+    kafkaReporter.report();
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    KafkaJobStatusMonitor jobStatusMonitor =  new 
KafkaAvroJobStatusMonitor("test",config, 1);
+
+    ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, 
this.jobGroup, this.jobName);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to Failed 
after trying twice
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
+  }
+
   private GobblinTrackingEvent createFlowCompiledEvent() {
     String namespace = "org.apache.gobblin.metrics";
     Long timestamp = System.currentTimeMillis();
@@ -188,11 +309,19 @@ public class KafkaAvroJobStatusMonitorTest {
     return event;
   }
 
-  private GobblinTrackingEvent createJobOrchestratedEvent() {
+  /**
+   * Create a Job Orchestrated Event with a configurable currentAttempt
+   * @param currentAttempt specify the number of attempts for the 
JobOrchestration event
+   * @return the {@link GobblinTrackingEvent}
+   */
+  private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt) {
     String namespace = "org.apache.gobblin.metrics";
     Long timestamp = System.currentTimeMillis();
     String name = TimingEvent.LauncherTimings.JOB_ORCHESTRATED;
     Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "2");
+    metadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
String.valueOf(currentAttempt));
+    metadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, 
Boolean.toString(false));
     metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
     metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
     metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
@@ -243,6 +372,24 @@ public class KafkaAvroJobStatusMonitorTest {
     return event;
   }
 
+  private GobblinTrackingEvent createJobFailedEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.LauncherTimings.JOB_FAILED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, 
this.jobExecutionId);
+    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+    metadata.put(TimingEvent.METADATA_START_TIME, "7");
+    metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, 
namespace, name, metadata);
+    return event;
+  }
   /**
    *   Create a dummy event to test if it is filtered out by the consumer.
    */
@@ -265,11 +412,19 @@ public class KafkaAvroJobStatusMonitorTest {
     }
   }
 
+  @AfterMethod
+  public void cleanUpStateStore() {
+    try {
+      cleanUpDir(stateStoreDir);
+    } catch(Exception e) {
+      System.err.println("Failed to clean up the state store.");
+    }
+  }
+
   @AfterClass
   public void tearDown() {
     try {
       this.kafkaTestHelper.close();
-      cleanUpDir(stateStoreDir);
     } catch(Exception e) {
       System.err.println("Failed to close Kafka server.");
     }

Reply via email to