[
https://issues.apache.org/jira/browse/GOBBLIN-1634?focusedWorklogId=763322&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763322
]
ASF GitHub Bot logged work on GOBBLIN-1634:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Apr/22 07:07
Start Date: 28/Apr/22 07:07
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860553707
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobSLAKilledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobStartSLAKilledEvent(),
+ // simulate retry again as maximum attempt is 2, but verify that kill
event will not retry
+ createJobOrchestratedEvent(1),
+ createJobCancelledEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
Review Comment:
I ended up moving as much repeated code into functions so I think the
current state is readable enough? Let me know what you think, wanted to avoid
having to sleep after each emission to wait for Kafka to pick up the event.
Issue Time Tracking
-------------------
Worklog Id: (was: 763322)
Time Spent: 1h 20m (was: 1h 10m)
> GaaS Flow SLA Kills should be retryable if configured
> -----------------------------------------------------
>
> Key: GOBBLIN-1634
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1634
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: William Lo
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> On Gobblin as a Service flows can fail SLAs if they do not receive a Kafka
> event in some designated amount of time.
> Since GaaS supports retrys on failures, these failures due to SLAs should
> also be retryable.
> However, if the flow is cancelled from a user specified event through the API
> we do not want to retry.
> Additionally, we also do not want to retry if a flow is skipped due to
> concurrent jobs running at the same time, as it is unlikely without a more
> sophisticated waiting algorithm that the job will be finished by the time the
> job is retried again, wasting resources.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)