This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d0135b064 [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in
case it fails to process one GobblinTrackingEvent (#3513)
d0135b064 is described below
commit d0135b06447a5a456042db75a0d6d772fba9ab4b
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jun 14 13:21:01 2022 -0700
[GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails to
process one GobblinTrackingEvent (#3513)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails
to process one GobblinTrackingEvent
* add test
* fix test
Co-authored-by: Zihan Li <[email protected]>
---
.../runtime/KafkaAvroJobStatusMonitorTest.java | 30 +++++++
.../service/monitoring/KafkaJobStatusMonitor.java | 91 ++++++++++++----------
.../monitoring/MysqlJobStatusRetrieverTest.java | 2 +-
3 files changed, 80 insertions(+), 43 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index eb86ebfb1..fc5b87802 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -357,6 +358,35 @@ public class KafkaAvroJobStatusMonitorTest {
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
+ public void testProcessProgressingMessageWhenNoPreviousStatus() throws
IOException, ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic5");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createGTE(TimingEvent.JOB_COMPLETION_PERCENTAGE, new HashMap<>())
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty());
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ // Verify we are able to process it without NPE
+ Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+
@Test (dependsOnMethods =
"testProcessingRetriedForApparentlyTransientErrors")
public void testProcessMessageForCancelledAndKilledEvent() throws
IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic4");
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 363d5c744..272c94977 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -221,56 +221,63 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
@VisibleForTesting
static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore)
throws IOException {
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
- }
- if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
- jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
- }
- String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
- String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
- String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
- String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
- String storeName = jobStatusStoreName(flowGroup, flowName);
- String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
- List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
- org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
- String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
- // This is to make the change backward compatible as we may not have
this info in cluster events
- // If we does not have those info, we treat the event as coming from the
same attempts as previous one
- int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
- int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
- int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
- // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
- // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
- // And job status reflect the execution status in one attempt
- if (previousStatus != null && currentStatus != null &&
- (previousGeneration > currentGeneration
- || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
- || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
- log.warn(String.format("Received status [generation.attempts] = %s
[%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, currentGeneration, currentAttempts, previousStatus,
previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId,
jobGroup, jobName));
- jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
- } else {
- jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ try {
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD))
{
+ jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
JobStatusRetriever.NA_KEY);
+ }
+ String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+
+ List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
+ if (states.size() > 0) {
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ // This is to make the change backward compatible as we may not have
this info in cluster events
+ // If we does not have those info, we treat the event as coming from
the same attempts as previous one
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
previousGeneration);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
previousAttempts);
+ // We use three things to accurately count and thereby bound retries,
even amidst out-of-order events (by skipping late arrivals).
+ // The generation is monotonically increasing, while the attempts may
re-initialize back to 0. this two-part form prevents the composite value from
ever repeating.
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration || (
+ previousGeneration == currentGeneration && previousAttempts >
currentAttempts) || (previousGeneration == currentGeneration &&
previousAttempts == currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+ <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+ log.warn(String.format(
+ "Received status [generation.attempts] = %s [%s.%s] when already
%s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+ currentStatus, currentGeneration, currentAttempts,
previousStatus, previousGeneration, previousAttempts,
+ flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+ jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+ } else {
+ jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+ }
}
- }
- modifyStateIfRetryRequired(jobStatus);
- stateStore.put(storeName, tableName, jobStatus);
+ modifyStateIfRetryRequired(jobStatus);
+ stateStore.put(storeName, tableName, jobStatus);
+ } catch (Exception e) {
+ log.warn("Meet exception when adding jobStatus to state store at "
+ + e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
+ throw new IOException(e);
+ }
}
private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
// SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
- if
((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
+ if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD)
&&(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
||
state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
||
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name())
&&
state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
) && currentAttempts < maxAttempts) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 765b526df..d226c8322 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -160,7 +160,7 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
try {
KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore());
} catch (IOException e) {
- Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
+ Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data
too long"));
return;
}
Assert.fail();