[
https://issues.apache.org/jira/browse/GOBBLIN-1634?focusedWorklogId=763203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763203
]
ASF GitHub Bot logged work on GOBBLIN-1634:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Apr/22 00:18
Start Date: 28/Apr/22 00:18
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860338652
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobSLAKilledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobStartSLAKilledEvent(),
+ // simulate retry again as maximum attempt is 2, but verify that kill
event will not retry
+ createJobOrchestratedEvent(1),
+ createJobCancelledEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
Review Comment:
I think moreso than async publication it's delay between publication and
reads, since the test suite does seem to actually push to a test kafka instance
that is read directly.
I think I can try something like the third option where I can map an event
to an expected assertion (or set of assertions)
Issue Time Tracking
-------------------
Worklog Id: (was: 763203)
Time Spent: 50m (was: 40m)
> GaaS Flow SLA Kills should be retryable if configured
> -----------------------------------------------------
>
> Key: GOBBLIN-1634
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1634
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: William Lo
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> On Gobblin as a Service flows can fail SLAs if they do not receive a Kafka
> event in some designated amount of time.
> Since GaaS supports retrys on failures, these failures due to SLAs should
> also be retryable.
> However, if the flow is cancelled from a user specified event through the API
> we do not want to retry.
> Additionally, we also do not want to retry if a flow is skipped due to
> concurrent jobs running at the same time, as it is unlikely without a more
> sophisticated waiting algorithm that the job will be finished by the time the
> job is retried again, wasting resources.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)