This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0ab5b33f3 [GOBBLIN-2113] Process Heartbeat DagAction CDC messages with
empty FlowExecutionId str (#4004)
0ab5b33f3 is described below
commit 0ab5b33f3125f503d59ffce8d281c7cc7aba1483
Author: umustafi <[email protected]>
AuthorDate: Tue Jul 16 18:22:41 2024 -0700
[GOBBLIN-2113] Process Heartbeat DagAction CDC messages with empty
FlowExecutionId str (#4004)
* Process Heartbeat DagAction CDC messages with empty FlowExecutionId str
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/DagActionStoreChangeMonitorTest.java | 14 +++++------
...gManagementDagActionStoreChangeMonitorTest.java | 8 +++---
.../modules/orchestration/Orchestrator.java | 3 +++
.../utils/FlowCompilationValidationHelper.java | 29 ++++++++++++++--------
.../monitoring/DagActionStoreChangeMonitor.java | 8 +++---
5 files changed, 37 insertions(+), 25 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 40c930ea4..a5de624ce 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -70,7 +70,7 @@ public class DagActionStoreChangeMonitorTest {
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
- private final long FLOW_EXECUTION_ID = 123L;
+ private final String FLOW_EXECUTION_ID = "123456789";
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;
@@ -135,7 +135,7 @@ public class DagActionStoreChangeMonitorTest {
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws
SpecNotFoundException {
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
- wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "",
FLOW_EXECUTION_ID, null);
+ wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "",
null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
@@ -150,7 +150,7 @@ public class DagActionStoreChangeMonitorTest {
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
public void testProcessMessageWithHeartbeatAndFlowInfo() throws
SpecNotFoundException {
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
- wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
+ wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, "",
"", DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(),
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
@@ -251,12 +251,12 @@ public class DagActionStoreChangeMonitorTest {
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent
createDagActionStoreChangeEvent(OperationType operationType,
- String flowGroup, String flowName, long flowExecutionId, DagActionValue
dagAction) {
+ String flowGroup, String flowName, String flowExecutionId,
DagActionValue dagAction) {
String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter),
System.currentTimeMillis(), operationType);
txidCounter++;
- return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, String.valueOf(flowExecutionId),
+ return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, flowExecutionId,
DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
}
@@ -264,7 +264,7 @@ public class DagActionStoreChangeMonitorTest {
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow
identifiers
*/
- public static String getKeyForFlow(String flowGroup, String flowName, long
flowExecutionId) {
+ public static String getKeyForFlow(String flowGroup, String flowName, String
flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}
@@ -272,7 +272,7 @@ public class DagActionStoreChangeMonitorTest {
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord
wrapDagActionStoreChangeEvent(OperationType operationType,
- String flowGroup, String flowName, long flowExecutionId, DagActionValue
dagAction) {
+ String flowGroup, String flowName, String flowExecutionId,
DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
index 8d2b6bab9..3dbf0a961 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java
@@ -60,7 +60,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
private final int OFFSET = 1;
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
- private final long FLOW_EXECUTION_ID = 123L;
+ private final String FLOW_EXECUTION_ID = "987654321";
private final String JOB_NAME = "jobName";
private MockDagManagementDagActionStoreChangeMonitor
mockDagManagementDagActionStoreChangeMonitor;
private int txidCounter = 0;
@@ -109,7 +109,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
public void testProcessMessageWithDelete() throws SchedulerException {
Kafka09ConsumerClient.Kafka09ConsumerRecord<String,
DagActionStoreChangeEvent> consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
DagActionValue.ENFORCE_JOB_START_DEADLINE);
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME,
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME,
Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
/* TODO: skip deadline removal for now and let them fire
@@ -124,7 +124,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent
createDagActionStoreChangeEvent(OperationType operationType,
- String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionValue dagAction) {
+ String flowGroup, String flowName, String flowExecutionId, String
jobName, DagActionValue dagAction) {
String key = DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup,
flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter),
System.currentTimeMillis(), operationType);
@@ -137,7 +137,7 @@ public class DagManagementDagActionStoreChangeMonitorTest {
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord<String,
DagActionStoreChangeEvent> wrapDagActionStoreChangeEvent(
- OperationType operationType, String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionValue dagAction) {
+ OperationType operationType, String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f53b156b7..04e12c13e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -290,6 +290,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
_log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
+ } catch (IOException | InterruptedException e) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ throw e;
} finally {
this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 553fee271..b1bace478 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -112,20 +112,27 @@ public class FlowCompilationValidationHelper {
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILED);
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
- validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName, flowMetadata);
- if (!jobExecutionPlanDagOptional.isPresent()) {
- return Optional.absent();
- }
+ try {
+ Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+ validateAndHandleConcurrentExecution(flowConfig, flowSpec,
flowGroup, flowName, flowMetadata);
- if (jobExecutionPlanDagOptional.get().isEmpty()) {
- populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
- return Optional.absent();
- }
+ if (!jobExecutionPlanDagOptional.isPresent()) {
+ return Optional.absent();
+ }
+
+ if (jobExecutionPlanDagOptional.get().isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
- flowCompilationTimer.stop(flowMetadata);
- return jobExecutionPlanDagOptional;
+ flowCompilationTimer.stop(flowMetadata);
+ return jobExecutionPlanDagOptional;
+ } catch (IOException e) {
+ log.error("Encountered exception when attempting to compile and perform
checks for flowGroup: {} flowName: {}",
+ flowGroup, flowName);
+ throw e;
+ }
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 3f2c86771..d9d0913bc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -199,7 +199,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
String operation =
value.getChangeEventIdentifier().getOperationType().name();
String flowGroup = value.getFlowGroup();
String flowName = value.getFlowName();
- long flowExecutionId = Long.parseLong(value.getFlowExecutionId());
+ String flowExecutionId = value.getFlowExecutionId();
String jobName = value.getJobName();
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
@@ -219,12 +219,14 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
}
DagActionStore.DagActionType dagActionType =
DagActionStore.DagActionType.valueOf(value.getDagAction().toString());
+ // Parse flowExecutionIds after filtering out HB messages to prevent
exception from parsing empty strings
+ long flowExecutionIdLong = Long.parseLong(flowExecutionId);
// Used to easily log information to identify the dag action
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionIdLong, jobName,
dagActionType);
- handleDagAction(operation, dagAction, flowGroup, flowName,
flowExecutionId, dagActionType);
+ handleDagAction(operation, dagAction, flowGroup, flowName,
flowExecutionIdLong, dagActionType);
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}