This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9a516d3b8 [GOBBLIN-1935] Skip null dag action types unable to be
processed (#3807)
9a516d3b8 is described below
commit 9a516d3b8e832acb86acaf7b551e5fba13728531
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 24 17:04:54 2023 -0700
[GOBBLIN-1935] Skip null dag action types unable to be processed (#3807)
* Skip over null dag actions from malformed messages
* Add new metric for skipped messages
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java | 1 +
.../gobblin/service/monitoring/DagActionStoreChangeMonitor.java | 8 ++++++++
2 files changed, 9 insertions(+)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index dc4e26e91..86adb4671 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -48,6 +48,7 @@ public class RuntimeMetrics {
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT =
DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 6855ca669..b05e73105 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -62,6 +62,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareMeter messageFilteredOutMeter;
+ private ContextAwareMeter malformedMessagesSkippedMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
private volatile Long produceToConsumeDelayValue = -1L;
@@ -123,6 +124,12 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();
+ if (value.getDagAction() == null) {
+ log.warn("Skipping null dag action type received for flow group: {}
name: {} executionId: {} tid: {} operation: "
+ + "{}", flowGroup, flowName, flowExecutionId, tid, operation);
+ this.malformedMessagesSkippedMeter.mark();
+ return;
+ }
DagActionStore.FlowActionType dagActionType =
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
@@ -225,6 +232,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
this.messageFilteredOutMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
+ this.malformedMessagesSkippedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED);
this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}