This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ab5b33f3 [GOBBLIN-2113] Process Heartbeat DagAction CDC messages with 
empty FlowExecutionId str (#4004)
0ab5b33f3 is described below

commit 0ab5b33f3125f503d59ffce8d281c7cc7aba1483
Author: umustafi <[email protected]>
AuthorDate: Tue Jul 16 18:22:41 2024 -0700

    [GOBBLIN-2113] Process Heartbeat DagAction CDC messages with empty 
FlowExecutionId str (#4004)
    
    * Process Heartbeat DagAction CDC messages with empty FlowExecutionId str
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/DagActionStoreChangeMonitorTest.java   | 14 +++++------
 ...gManagementDagActionStoreChangeMonitorTest.java |  8 +++---
 .../modules/orchestration/Orchestrator.java        |  3 +++
 .../utils/FlowCompilationValidationHelper.java     | 29 ++++++++++++++--------
 .../monitoring/DagActionStoreChangeMonitor.java    |  8 +++---
 5 files changed, 37 insertions(+), 25 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 40c930ea4..a5de624ce 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -70,7 +70,7 @@ public class DagActionStoreChangeMonitorTest {
 
   private final String FLOW_GROUP = "flowGroup";
   private final String FLOW_NAME = "flowName";
-  private final long FLOW_EXECUTION_ID = 123L;
+  private final String FLOW_EXECUTION_ID = "123456789";
   private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
   private int txidCounter = 0;
 
@@ -135,7 +135,7 @@ public class DagActionStoreChangeMonitorTest {
   @Test
   public void testProcessMessageWithHeartbeatAndNullDagAction() throws 
SpecNotFoundException {
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
-        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", 
FLOW_EXECUTION_ID, null);
+        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", 
null);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
     verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
     verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
@@ -150,7 +150,7 @@ public class DagActionStoreChangeMonitorTest {
   @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
   public void testProcessMessageWithHeartbeatAndFlowInfo() throws 
SpecNotFoundException {
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
-        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
+        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, "", 
"", DagActionValue.RESUME);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
     verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
     verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
@@ -251,12 +251,12 @@ public class DagActionStoreChangeMonitorTest {
    * Util to create a general DagActionStoreChange type event
    */
   private DagActionStoreChangeEvent 
createDagActionStoreChangeEvent(OperationType operationType,
-      String flowGroup, String flowName, long flowExecutionId, DagActionValue 
dagAction) {
+      String flowGroup, String flowName, String flowExecutionId, 
DagActionValue dagAction) {
     String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
     GenericStoreChangeEvent genericStoreChangeEvent =
         new GenericStoreChangeEvent(key, String.valueOf(txidCounter), 
System.currentTimeMillis(), operationType);
     txidCounter++;
-    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, String.valueOf(flowExecutionId),
+    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, flowExecutionId,
         DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
   }
 
@@ -264,7 +264,7 @@ public class DagActionStoreChangeMonitorTest {
    * Form a key for events using the flow identifiers
    * @return a key formed by adding an '_' delimiter between the flow 
identifiers
    */
-  public static String getKeyForFlow(String flowGroup, String flowName, long 
flowExecutionId) {
+  public static String getKeyForFlow(String flowGroup, String flowName, String 
flowExecutionId) {
     return flowGroup + "_" + flowName + "_" + flowExecutionId;
   }
 
@@ -272,7 +272,7 @@ public class DagActionStoreChangeMonitorTest {
    * Util to create wrapper around DagActionStoreChangeEvent
    */
   private Kafka09ConsumerClient.Kafka09ConsumerRecord 
wrapDagActionStoreChangeEvent(OperationType operationType,
-      String flowGroup, String flowName, long flowExecutionId, DagActionValue 
dagAction) {
+      String flowGroup, String flowName, String flowExecutionId, 
DagActionValue dagAction) {
     DagActionStoreChangeEvent eventToProcess = null;
     try {
       eventToProcess =
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
index 8d2b6bab9..3dbf0a961 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -60,7 +60,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
   private final int OFFSET = 1;
   private final String FLOW_GROUP = "flowGroup";
   private final String FLOW_NAME = "flowName";
-  private final long FLOW_EXECUTION_ID = 123L;
+  private final String FLOW_EXECUTION_ID = "987654321";
   private final String JOB_NAME = "jobName";
   private MockDagManagementDagActionStoreChangeMonitor 
mockDagManagementDagActionStoreChangeMonitor;
   private int txidCounter = 0;
@@ -109,7 +109,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
   public void testProcessMessageWithDelete() throws SchedulerException {
     Kafka09ConsumerClient.Kafka09ConsumerRecord<String, 
DagActionStoreChangeEvent> consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME, 
DagActionValue.ENFORCE_JOB_START_DEADLINE);
-    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, 
Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME,
         DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
     
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
     /* TODO: skip deadline removal for now and let them fire
@@ -124,7 +124,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
    * Util to create a general DagActionStoreChange type event
    */
   private DagActionStoreChangeEvent 
createDagActionStoreChangeEvent(OperationType operationType,
-      String flowGroup, String flowName, long flowExecutionId, String jobName, 
DagActionValue dagAction) {
+      String flowGroup, String flowName, String flowExecutionId, String 
jobName, DagActionValue dagAction) {
     String key = DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup, 
flowName, flowExecutionId);
     GenericStoreChangeEvent genericStoreChangeEvent =
         new GenericStoreChangeEvent(key, String.valueOf(txidCounter), 
System.currentTimeMillis(), operationType);
@@ -137,7 +137,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
    * Util to create wrapper around DagActionStoreChangeEvent
    */
   private Kafka09ConsumerClient.Kafka09ConsumerRecord<String, 
DagActionStoreChangeEvent> wrapDagActionStoreChangeEvent(
-      OperationType operationType, String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionValue dagAction) {
+      OperationType operationType, String flowGroup, String flowName, String 
flowExecutionId, String jobName, DagActionValue dagAction) {
     DagActionStoreChangeEvent eventToProcess = null;
     try {
       eventToProcess =
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f53b156b7..04e12c13e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -290,6 +290,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
       }
+    } catch (IOException | InterruptedException e) {
+      Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+      throw e;
     } finally {
       this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 553fee271..b1bace478 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -112,20 +112,27 @@ public class FlowCompilationValidationHelper {
 
     TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
     Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
-        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, 
flowName, flowMetadata);
 
-    if (!jobExecutionPlanDagOptional.isPresent()) {
-      return Optional.absent();
-    }
+    try {
+      Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+          validateAndHandleConcurrentExecution(flowConfig, flowSpec, 
flowGroup, flowName, flowMetadata);
 
-    if (jobExecutionPlanDagOptional.get().isEmpty()) {
-      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
-      return Optional.absent();
-    }
+      if (!jobExecutionPlanDagOptional.isPresent()) {
+        return Optional.absent();
+      }
+
+      if (jobExecutionPlanDagOptional.get().isEmpty()) {
+        populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+        return Optional.absent();
+      }
 
-    flowCompilationTimer.stop(flowMetadata);
-    return jobExecutionPlanDagOptional;
+      flowCompilationTimer.stop(flowMetadata);
+      return jobExecutionPlanDagOptional;
+    } catch (IOException e) {
+      log.error("Encountered exception when attempting to compile and perform 
checks for flowGroup: {} flowName: {}",
+          flowGroup, flowName);
+      throw e;
+    }
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 3f2c86771..d9d0913bc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -199,7 +199,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
     String operation = 
value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
-    long flowExecutionId = Long.parseLong(value.getFlowExecutionId());
+    String flowExecutionId = value.getFlowExecutionId();
     String jobName = value.getJobName();
 
     produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
@@ -219,12 +219,14 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
     }
 
     DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(value.getDagAction().toString());
+    // Parse flowExecutionIds after filtering out HB messages to prevent 
exception from parsing empty strings
+    long flowExecutionIdLong = Long.parseLong(flowExecutionId);
 
     // Used to easily log information to identify the dag action
-    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionIdLong, jobName,
         dagActionType);
 
-    handleDagAction(operation, dagAction, flowGroup, flowName, 
flowExecutionId, dagActionType);
+    handleDagAction(operation, dagAction, flowGroup, flowName, 
flowExecutionIdLong, dagActionType);
 
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }

Reply via email to