[
https://issues.apache.org/jira/browse/GOBBLIN-1634?focusedWorklogId=763170&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763170
]
ASF GitHub Bot logged work on GOBBLIN-1634:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Apr/22 21:58
Start Date: 27/Apr/22 21:58
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860238661
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobSLAKilledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobStartSLAKilledEvent(),
+ // simulate retry again as maximum attempt is 2, but verify that kill
event will not retry
+ createJobOrchestratedEvent(1),
+ createJobCancelledEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
Review Comment:
I do see the difficulty of potentially async publication (is that why you
`Thread.sleep(1000)`?) with the events. that said, I have difficulty at this
point recalling which message should be coming from each particular
`recordIterator.next()`.
there may not be a good approach here, but ones we could always consider
would be to:
a. publish one message at a time (and keep reading one by one from same
`recordIterator`)
b. write an abstraction to publish one message and return a `recordIterator`
for it (or better still, a single element from the iterator)
c. condense each set of assertions (following `stateList =
stateStore.getAll(storeName, tableName)`) as a lambda, then defining each event
in the same place near its corresponding assertion lambda.
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java:
##########
@@ -72,6 +72,8 @@ public static class FlowTimings {
public static final String FLOW_FAILED = "FlowFailed";
public static final String FLOW_RUNNING = "FlowRunning";
public static final String FLOW_CANCELLED = "FlowCancelled";
+ public static final String FLOW_SLA_KILLED = "FlowSLAKilled";
+ public static final String FLOW_START_SLA_KILLED = "FlowStartSLAKilled";
Review Comment:
\<meta>I'm having trouble separating what may be a personal pet peeve vs.
overall clarity.\</meta>
I find 'SLA' a confusing term for what is essentially a deadline. therefore
I'd suggest semantic names in our state space, such as:
```
FLOW_RUN_DEADLINE_EXCEEDED
FLOW_START_DEADLINE_EXCEEDED
```
\<meta>still, I don't wish to insist this is semantically clearer for future
maintainers (seems like users won't observe the state, correct?), should you
find this merely personal preference on my part.\</meta>
for background, a SLA is an enforceable expectation. it should be monitored
with deviations detected, but I am not familiar w/ any precedent for
immediately terminating processing upon violation.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -279,7 +278,8 @@ static void
addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
}
modifyStateIfRetryRequired(jobStatus);
-
+ // Remove data not needed to be stored in state store
+ jobStatus.removeProp(TimingEvent.FlowEventConstants.IS_FLOW_SLA_KILLED);
Review Comment:
could be a case for putting the `removeProp` within `modifyStateIf...`
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java:
##########
@@ -72,6 +72,8 @@ public static class FlowTimings {
public static final String FLOW_FAILED = "FlowFailed";
public static final String FLOW_RUNNING = "FlowRunning";
public static final String FLOW_CANCELLED = "FlowCancelled";
+ public static final String FLOW_SLA_KILLED = "FlowSLAKilled";
+ public static final String FLOW_START_SLA_KILLED = "FlowStartSLAKilled";
Review Comment:
and overall, I didn't initially see a way to define these semantics w/o
adding new `ExecutionStatus` states, but that's what you did, and AFAICT it's a
better approach!
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -558,6 +656,63 @@ private GobblinTrackingEvent createJobFailedEvent() {
GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp,
namespace, name, metadata);
return event;
}
+
+ private GobblinTrackingEvent createJobCancelledEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.FlowTimings.FLOW_CANCELLED;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
this.jobGroup);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
this.jobExecutionId);
+ metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+ metadata.put(TimingEvent.METADATA_START_TIME, "7");
+ metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp,
namespace, name, metadata);
+ return event;
+ }
+
+ private GobblinTrackingEvent createJobSLAKilledEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.FlowTimings.FLOW_SLA_KILLED;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
this.jobGroup);
Review Comment:
seems a lot of boilerplate/duplication here. could we capture within a
common function that each of these could pass their specifics into? e.g.
```
GobblinTrackingEvent createGTE(String eventName, Map<String, String>
customMetadata)
```
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobSLAKilledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobStartSLAKilledEvent(),
+ // simulate retry again as maximum attempt is 2, but verify that kill
event will not retry
+ createJobOrchestratedEvent(1),
+ createJobCancelledEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
Review Comment:
could we encapsulate this? e.g.
```MockKafkaAvroJobStatusMonitor createKAJSM(...)```
Issue Time Tracking
-------------------
Worklog Id: (was: 763170)
Time Spent: 20m (was: 10m)
> GaaS Flow SLA Kills should be retryable if configured
> -----------------------------------------------------
>
> Key: GOBBLIN-1634
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1634
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: William Lo
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> On Gobblin as a Service flows can fail SLAs if they do not receive a Kafka
> event in some designated amount of time.
> Since GaaS supports retrys on failures, these failures due to SLAs should
> also be retryable.
> However, if the flow is cancelled from a user specified event through the API
> we do not want to retry.
> Additionally, we also do not want to retry if a flow is skipped due to
> concurrent jobs running at the same time, as it is unlikely without a more
> sophisticated waiting algorithm that the job will be finished by the time the
> job is retried again, wasting resources.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)