This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5e8371f5ad [GOBBLIN-2111] Delete adhoc flowSpecs in DagProcEngine
(#4002)
5e8371f5ad is described below
commit 5e8371f5ad7482ce34bec9eb62fb60ea87c4bddb
Author: umustafi <[email protected]>
AuthorDate: Thu Jul 18 18:03:12 2024 -0700
[GOBBLIN-2111] Delete adhoc flowSpecs in DagProcEngine (#4002)
* Delete adhoc flowSpec after LaunchDagTask.conclude to only delete after
finishing the DagProc
---
.../modules/orchestration/task/DagTask.java | 4 +-
.../modules/orchestration/task/LaunchDagTask.java | 30 ++++++++-
.../orchestration/DagProcessingEngineTest.java | 3 +-
.../orchestration/proc/LaunchDagProcTest.java | 13 +++-
.../orchestration/task/LaunchDagTaskTest.java | 73 ++++++++++++++++++++++
5 files changed, 116 insertions(+), 7 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index b01860a9e2..df62eb971c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -40,8 +40,8 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@Slf4j
public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
+ protected final DagManagementStateStore dagManagementStateStore;
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
- private final DagManagementStateStore dagManagementStateStore;
public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
DagManagementStateStore dagManagementStateStore) {
@@ -57,7 +57,7 @@ public abstract class DagTask {
* work on this task, is done in this method.
* Returns true if concluding dag task finished successfully otherwise false.
*/
- public final boolean conclude() {
+ public boolean conclude() {
try {
this.dagManagementStateStore.deleteDagAction(this.dagAction);
return this.leaseObtainedStatus.completeLease();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 8a726b2dc8..909763a0cd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -17,8 +17,15 @@
package org.apache.gobblin.service.modules.orchestration.task;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
@@ -26,7 +33,7 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
/**
* A {@link DagTask} responsible to handle launch tasks.
*/
-
+@Slf4j
public class LaunchDagTask extends DagTask {
public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
DagManagementStateStore dagManagementStateStore) {
@@ -36,4 +43,25 @@ public class LaunchDagTask extends DagTask {
public <T> T host(DagTaskVisitor<T> visitor) {
return visitor.meet(this);
}
+
+ @Override
+ public final boolean conclude() {
+ try {
+ // Remove adhoc flow specs after the adhoc job is launched and marked as
completed
+ if (super.conclude()) {
+ DagManager.DagId dagId =
DagManagerUtils.generateDagId(this.dagAction.getFlowGroup(),
+ this.dagAction.getFlowName(), this.dagAction.getFlowExecutionId());
+ FlowSpec flowSpec =
+
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
+ if (!flowSpec.isScheduled()) {
+ dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new
Properties(), false);
+ }
+ return true;
+ }
+ } catch (SpecNotFoundException | URISyntaxException e) {
+ log.error("Unable to retrieve flowSpec to delete from flowCatalog if
adhoc.");
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 06982c5f18..baf4bfabbb 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -54,6 +54,7 @@ public class DagProcessingEngineTest {
private DagManagementTaskStreamImpl dagManagementTaskStream;
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
+ // Field is static because it's used to instantiate every MockedDagTask
private static MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testMetastoreDatabase;
static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@@ -128,7 +129,7 @@ public class DagProcessingEngineTest {
private final boolean isBad;
public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
- super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+ super(dagAction, leaseObtainedStatus,
DagProcessingEngineTest.dagManagementStateStore);
this.isBad = isBad;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 1b8c7da4ca..16c1bf8ffe 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
@@ -59,9 +60,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.util.ConfigUtils;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -75,6 +74,13 @@ public class LaunchDagProcTest {
@BeforeClass
public void setUp() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ /**
+ * Reset DagManagementStateStore between tests so that Mockito asserts are
done on a fresh state.
+ */
+ @BeforeMethod
+ public void resetDMSS() throws Exception {
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(this.dagManagementStateStore);
}
@@ -180,6 +186,7 @@ public class LaunchDagProcTest {
public static void mockDMSSCommonBehavior(DagManagementStateStore
dagManagementStateStore) throws IOException, SpecNotFoundException {
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+ doNothing().when(dagManagementStateStore).removeFlowSpec(any(), any(),
anyBoolean());
doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
doReturn(true).when(dagManagementStateStore).releaseQuota(any());
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
new file mode 100644
index 0000000000..0a057b7457
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import org.testng.annotations.BeforeClass;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static
org.apache.gobblin.service.modules.orchestration.OrchestratorTest.*;
+import static org.mockito.ArgumentMatchers.*;
+
+
+public class LaunchDagTaskTest {
+ DagActionStore.DagAction dagAction;
+ LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
+ DagManagementStateStore dagManagementStateStore;
+
+ @BeforeClass
+ public void setUp() throws URISyntaxException, SpecNotFoundException,
IOException {
+ String flowGroup = "group1";
+ dagAction = new DagActionStore.DagAction(flowGroup, "name1",
+ 1L, "", DagActionStore.DagActionType.LAUNCH);
+
+ MultiActiveLeaseArbiter mockedLeaseArbiter =
Mockito.mock(MultiActiveLeaseArbiter.class);
+
Mockito.when(mockedLeaseArbiter.recordLeaseSuccess(any())).thenReturn(true);
+
+ leaseObtainedStatus = new LeaseAttemptStatus.LeaseObtainedStatus(
+ new DagActionStore.LeaseParams(dagAction, true, 1),
+ 0, 5, mockedLeaseArbiter);
+ dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+
+ FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(flowGroup);
+ FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
+
Mockito.when(dagManagementStateStore.getFlowSpec(any())).thenReturn(adhocSpec);
+ }
+
+ /*
+ Validate that after concluding an adhoc flow its flowSpec will be removed
+ */
+ @Test
+ public void concludeRemovesAdhocFlowSpec() throws IOException {
+ LaunchDagTask dagTask = new LaunchDagTask(dagAction, leaseObtainedStatus,
dagManagementStateStore);
+ dagTask.conclude();
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(any());
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).removeFlowSpec(any(), any(), anyBoolean());
+ }
+}