This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a516d3b8 [GOBBLIN-1935] Skip null dag action types unable to be 
processed (#3807)
9a516d3b8 is described below

commit 9a516d3b8e832acb86acaf7b551e5fba13728531
Author: umustafi <[email protected]>
AuthorDate: Tue Oct 24 17:04:54 2023 -0700

    [GOBBLIN-1935] Skip null dag action types unable to be processed (#3807)
    
    * Skip over null dag actions from malformed messages
    
    * Add new metric for skipped messages
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java   | 1 +
 .../gobblin/service/monitoring/DagActionStoreChangeMonitor.java   | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index dc4e26e91..86adb4671 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -48,6 +48,7 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".kills.invoked";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".message.processed";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".messagesFilteredOut";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".malformedMessagedSkipped";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= DAG_ACTION_STORE_MONITOR_PREFIX + ".resumes.invoked";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = 
DAG_ACTION_STORE_MONITOR_PREFIX + ".flows.launched";
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 6855ca669..b05e73105 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -62,6 +62,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   private ContextAwareMeter unexpectedErrors;
   private ContextAwareMeter messageProcessedMeter;
   private ContextAwareMeter messageFilteredOutMeter;
+  private ContextAwareMeter malformedMessagesSkippedMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
 
   private volatile Long produceToConsumeDelayValue = -1L;
@@ -123,6 +124,12 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
+    if (value.getDagAction() == null) {
+      log.warn("Skipping null dag action type received for flow group: {} 
name: {} executionId: {} tid: {} operation: "
+          + "{}", flowGroup, flowName, flowExecutionId, tid, operation);
+      this.malformedMessagesSkippedMeter.mark();
+      return;
+    }
     DagActionStore.FlowActionType dagActionType = 
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
 
     produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
@@ -225,6 +232,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
     this.messageFilteredOutMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGES_FILTERED_OUT);
+    this.malformedMessagesSkippedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MALFORMED_MESSAGES_SKIPPED);
     this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
     this.getMetricContext().register(this.produceToConsumeDelayMillis);
   }

Reply via email to