This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 13faea46b [GOBBLIN-1782] Fix Merge State for Flow Pending Resume
statuses (#3639)
13faea46b is described below
commit 13faea46bd2f23999fb1bf9ea579296fb86d1e3d
Author: meethngala <[email protected]>
AuthorDate: Wed Feb 8 14:03:19 2023 -0800
[GOBBLIN-1782] Fix Merge State for Flow Pending Resume statuses (#3639)
* fix merge state for flow pending resume flow statuses
* address feedback on the PR
---------
Co-authored-by: Meeth Gala <[email protected]>
---
.../runtime/KafkaAvroJobStatusMonitorTest.java | 64 ++++++++++++++++++++++
.../service/monitoring/KafkaJobStatusMonitor.java | 13 ++++-
2 files changed, 75 insertions(+), 2 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 6f1069e15..b9a3521a8 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -429,6 +429,62 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
+ public void testProcessMessageForFlowPendingResume() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1, 2),
+ createJobCancelledEvent(),
+ createFlowPendingResumeEvent(),
+ createJobOrchestratedEvent(2, 2),
+ createJobStartEvent(),
+ createJobSucceededEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSObservabilityEventProducer());
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
+
+ // Job for flow pending resume status after it was cancelled or failed
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RESUME.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ //Job orchestrated for retrying
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ jobStatusMonitor.shutDown();
+ }
+
@Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
public void testProcessProgressingMessageWhenNoPreviousStatus() throws
IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic5");
@@ -619,6 +675,14 @@ public class KafkaAvroJobStatusMonitorTest {
return createGTE(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED,
Maps.newHashMap());
}
+ private GobblinTrackingEvent createFlowPendingResumeEvent() {
+ GobblinTrackingEvent event =
createGTE(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, Maps.newHashMap());
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ event.getMetadata().remove(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ return event;
+
+ }
+
private GobblinTrackingEvent createGTE(String eventName, Map<String, String>
customMetadata) {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 7d1654af3..87e1c3b5a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -251,13 +251,17 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // Verify if the current job status is flow status. If yes, we check
for its current execution status to be PENDING_RESUME (limiting to just resume
flow statuses)
+ // When the above two conditions satisfy, we NEED NOT check for the
out-of-order events since GaaS would manage the lifecycle of these events
+ // Hence, we update the merge state accordingly so that the flow can
proceed with its execution to the next state in the DAG
+ boolean isFlowStatusAndPendingResume =
isFlowStatusAndPendingResume(jobName, jobGroup, currentStatus);
// We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
// The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
// And job status reflect the execution status in one attempt
- if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration || (
+ if (!isFlowStatusAndPendingResume && (previousStatus != null &&
currentStatus != null && (previousGeneration > currentGeneration || (
previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
&&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
- <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))))
{
log.warn(String.format(
"Received status [generation.attempts] = %s [%s.%s] when already
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
currentStatus, currentGeneration, currentAttempts,
previousStatus, previousGeneration, previousAttempts,
@@ -280,6 +284,11 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
+ private static boolean isFlowStatusAndPendingResume(String jobName, String
jobGroup, String currentStatus) {
+ return jobName != null && jobGroup != null &&
jobName.equals(JobStatusRetriever.NA_KEY) &&
jobGroup.equals(JobStatusRetriever.NA_KEY)
+ && currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
+ }
+
private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);