Will-Lo commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860338652


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobStartSLAKilledEvent(),
+        // simulate retry again as maximum attempt is 2, but verify that kill 
event will not retry
+        createJobOrchestratedEvent(1),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+      this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, 
this.jobGroup, this.jobName);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());

Review Comment:
   I think moreso than async publication it's delay between publication and 
reads, since the test suite does seem to actually push to a test kafka instance 
that is read directly.
   
   I think I can try something like the third option where I can map an event 
to an expected assertion (or set of assertions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to