This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 587972312 [GOBBLIN-2107] Delete adhoc flowSpecs from flowCatalog bug
fix (#3996)
587972312 is described below
commit 5879723122ed2be34131dcadbfaeb5acf36ee563
Author: umustafi <[email protected]>
AuthorDate: Wed Jul 10 12:07:16 2024 -0700
[GOBBLIN-2107] Delete adhoc flowSpecs from flowCatalog bug fix (#3996)
* Delete adhoc flowSpecs from flowCatalog to avoid build up
* if job compilation fails then ensure quota is released and flow
compilation event sent in all cases
* fix bug where adhoc flows are not deleted in multi-active
scheduler case by re-adding deletion to
DagManager.addDag()
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/orchestration/Orchestrator.java | 39 ++++++++++++-------
.../utils/FlowCompilationValidationHelper.java | 5 +++
.../monitoring/DagActionStoreChangeMonitor.java | 8 ++--
.../modules/orchestration/OrchestratorTest.java | 44 +++++++++++++++++++++-
4 files changed, 78 insertions(+), 18 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 8ff5e2daf..f53b156b7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -212,9 +212,10 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup,
flowName);
-
- // only compile and pass directly to `DagManager` when multi-active NOT
enabled; otherwise recompilation to occur later,
- // once `DagActionStoreChangeMonitor` subsequently delegates this
`DagActionType.LAUNCH`
+ /* Only compile and pass directly to `DagManager` when multi-active
scheduler NOT enabled; otherwise
+ recompilation to occur later, once `DagActionStoreChangeMonitor`
subsequently delegates this
+ `DagActionType.LAUNCH`
+ */
if (flowLaunchHandler.isPresent()) {
DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(
flowGroup,
@@ -230,7 +231,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
try {
- TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ TimingEvent flowCompilationTimer =
+ new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILED);
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
@@ -259,7 +261,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
// Depending on if DagManager is present, handle execution
submitFlowToDagManager(flowSpec, compiledDag);
} finally {
- // remove from the flow catalog, regardless of whether the flow was
successfully validated and permitted to exec (concurrently)
+ /* Remove adhoc flow spec from the flow catalog, regardless of
whether the flow was successfully validated
+ and permitted to exec (concurrently)
+ */
this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
}
}
@@ -271,14 +275,23 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException,
InterruptedException {
- Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
- if (optionalJobExecutionPlanDag.isPresent()) {
- submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
- } else {
- _log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ /**
+ * Compiles the provided {@link FlowSpec} into a {@link
Dag<JobExecutionPlan>} and forwards that to the
+ * {@link DagManager} for execution. It's meant to be called by
+ * {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
+ */
+ public void compileAndSubmitFlowToDagManager(FlowSpec flowSpec) throws
IOException, InterruptedException {
+ try {
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
+ if (optionalJobExecutionPlanDag.isPresent()) {
+ submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+ } else {
+ _log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ }
+ } finally {
+ this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 7ddfdb3ba..553fee271 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -140,7 +140,12 @@ public class FlowCompilationValidationHelper {
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ // Send FLOW_FAILED event
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Unable to compile
flowSpec to produce non-empty "
+ + "jobExecutionPlanDag.");
+ new TimingEvent(eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return Optional.absent();
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index a179811a4..3f2c86771 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -315,14 +315,16 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
change. It's crucial to adopt the consensus flowExecutionId here to
prevent creating a new one during compilation.
*/
spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
- this.orchestrator.submitFlowToDagManager(spec);
+ this.orchestrator.compileAndSubmitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}",
flowId, e.getMessage());
launchSubmissionMetricProxy.markFailure();
return;
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
- launchSubmissionMetricProxy.markFailure();
+ log.info("Spec not found for flowId {} due to deletion by active
dagManager host due to exception {}",
+ flowId, e.getMessage());
+ // TODO: mark this failure if there are other valid cases of this
exception
+ // launchSubmissionMetricProxy.markFailure();
return;
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} due to
exception {}", flowId, e.getMessage());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index a32a211fc..961eddaa2 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
@@ -25,6 +26,9 @@ import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -66,8 +70,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.*;
public class OrchestratorTest {
@@ -92,7 +95,9 @@ public class OrchestratorTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private FlowStatusGenerator mockFlowStatusGenerator;
private DagManager mockDagManager;
+ private FlowCompilationValidationHelper
mockedFlowCompilationValidationHelper;
private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
+ private Orchestrator mockedFlowCompValHelperBasedOrchestrator;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
@@ -141,6 +146,19 @@ public class OrchestratorTest {
this.topologyCatalog, mockDagManager, Optional.of(logger),
mockFlowStatusGenerator,
Optional.absent(), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
flowCompilationValidationHelper);
+
+ /* Initialize a second orchestrator with a mocked
flowCompilationValidationHelper to use Mockito to spoof the dag
+ returned by a call to compile a flowSpec
+ */
+ this.mockedFlowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
+
when(mockedFlowCompilationValidationHelper.getSpecCompiler()).thenReturn(mock(SpecCompiler.class));
+ Mockito.doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+
+ this.mockedFlowCompValHelperBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+ this.topologyCatalog, mockDagManager, Optional.of(logger),
mockFlowStatusGenerator,
+ Optional.absent(), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
+ this.mockedFlowCompilationValidationHelper);
+
this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
// Start application
@@ -447,6 +465,28 @@ public class OrchestratorTest {
Mockito.verify(this.mockDagManager,
Mockito.times(1)).removeFlowSpecIfAdhoc(any());
}
+
+ /**
+ * Tests that when compiling and forwarding a dagAction from
+ * {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor#submitFlowToDagManagerHelper}
to the
+ * DagManager that {@link DagManager#removeFlowSpecIfAdhoc(FlowSpec)} is
called to ensure adhoc flowSpecs are deleted
+ * after compilation.
+ */
+ @Test
+ public void testDeleteFlowSpecCalledForMultiActivePath()
+ throws IOException, URISyntaxException, InterruptedException {
+ FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
+ FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
+ FlowSpec flowSpec1 = initFlowSpec();
+
+ Optional<Dag<JobExecutionPlan>> dag = Optional.of(
+ DagManagerTest.buildDag("0", 123L, "FINISH_RUNNING", false));
+
Mockito.when(this.mockedFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec1)).thenReturn(dag);
+ Mockito.doNothing().when(mockDagManager).removeFlowSpecIfAdhoc(flowSpec1);
+
this.mockedFlowCompValHelperBasedOrchestrator.compileAndSubmitFlowToDagManager(flowSpec1);
+ Mockito.verify(this.mockedFlowCompValHelperBasedOrchestrator.dagManager,
times(1)).removeFlowSpecIfAdhoc(any(FlowSpec.class));
+ }
+
public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId) throws
URISyntaxException {
return createBasicFlowSpecForFlowId(flowId, new Properties());
}