This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e8371f5ad [GOBBLIN-2111] Delete adhoc flowSpecs in DagProcEngine 
(#4002)
5e8371f5ad is described below

commit 5e8371f5ad7482ce34bec9eb62fb60ea87c4bddb
Author: umustafi <[email protected]>
AuthorDate: Thu Jul 18 18:03:12 2024 -0700

    [GOBBLIN-2111] Delete adhoc flowSpecs in DagProcEngine (#4002)
    
    * Delete adhoc flowSpec after LaunchDagTask.conclude to only delete after 
finishing the DagProc
---
 .../modules/orchestration/task/DagTask.java        |  4 +-
 .../modules/orchestration/task/LaunchDagTask.java  | 30 ++++++++-
 .../orchestration/DagProcessingEngineTest.java     |  3 +-
 .../orchestration/proc/LaunchDagProcTest.java      | 13 +++-
 .../orchestration/task/LaunchDagTaskTest.java      | 73 ++++++++++++++++++++++
 5 files changed, 116 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index b01860a9e2..df62eb971c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -40,8 +40,8 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 @Slf4j
 public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
+  protected final DagManagementStateStore dagManagementStateStore;
   private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
-  private final DagManagementStateStore dagManagementStateStore;
 
   public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
       DagManagementStateStore dagManagementStateStore) {
@@ -57,7 +57,7 @@ public abstract class DagTask {
    * work on this task, is done in this method.
    * Returns true if concluding dag task finished successfully otherwise false.
    */
-  public final boolean conclude() {
+  public boolean conclude() {
     try {
       this.dagManagementStateStore.deleteDagAction(this.dagAction);
       return this.leaseObtainedStatus.completeLease();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 8a726b2dc8..909763a0cd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -17,8 +17,15 @@
 
 package org.apache.gobblin.service.modules.orchestration.task;
 
+import java.net.URISyntaxException;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
@@ -26,7 +33,7 @@ import 
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 /**
  * A {@link DagTask} responsible to handle launch tasks.
  */
-
+@Slf4j
 public class LaunchDagTask extends DagTask {
   public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
       DagManagementStateStore dagManagementStateStore) {
@@ -36,4 +43,25 @@ public class LaunchDagTask extends DagTask {
   public <T> T host(DagTaskVisitor<T> visitor) {
     return visitor.meet(this);
   }
+
+  @Override
+  public final boolean conclude() {
+    try {
+      // Remove adhoc flow specs after the adhoc job is launched and marked as 
completed
+      if (super.conclude()) {
+        DagManager.DagId dagId = 
DagManagerUtils.generateDagId(this.dagAction.getFlowGroup(),
+            this.dagAction.getFlowName(), this.dagAction.getFlowExecutionId());
+        FlowSpec flowSpec =
+            
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
+        if (!flowSpec.isScheduled()) {
+          dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+        }
+        return true;
+      }
+    } catch (SpecNotFoundException | URISyntaxException e) {
+      log.error("Unable to retrieve flowSpec to delete from flowCatalog if 
adhoc.");
+      throw new RuntimeException(e);
+    }
+    return false;
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 06982c5f18..baf4bfabbb 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -54,6 +54,7 @@ public class DagProcessingEngineTest {
   private DagManagementTaskStreamImpl dagManagementTaskStream;
   private DagTaskStream dagTaskStream;
   private DagProcFactory dagProcFactory;
+  // Field is static because it's used to instantiate every MockedDagTask
   private static MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testMetastoreDatabase;
   static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@@ -128,7 +129,7 @@ public class DagProcessingEngineTest {
     private final boolean isBad;
 
     public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
-      super(dagAction, leaseObtainedStatus, dagManagementStateStore);
+      super(dagAction, leaseObtainedStatus, 
DagProcessingEngineTest.dagManagementStateStore);
       this.isBad = isBad;
     }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 1b8c7da4ca..16c1bf8ffe 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.Config;
@@ -59,9 +60,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.util.ConfigUtils;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -75,6 +74,13 @@ public class LaunchDagProcTest {
   @BeforeClass
   public void setUp() throws Exception {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  /**
+   * Reset DagManagementStateStore between tests so that Mockito asserts are 
done on a fresh state.
+   */
+  @BeforeMethod
+  public void resetDMSS() throws Exception {
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
@@ -180,6 +186,7 @@ public class LaunchDagProcTest {
 
   public static void mockDMSSCommonBehavior(DagManagementStateStore 
dagManagementStateStore) throws IOException, SpecNotFoundException {
     
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(dagManagementStateStore).removeFlowSpec(any(), any(), 
anyBoolean());
     doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
     doReturn(true).when(dagManagementStateStore).releaseQuota(any());
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
new file mode 100644
index 0000000000..0a057b7457
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTaskTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import org.testng.annotations.BeforeClass;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static 
org.apache.gobblin.service.modules.orchestration.OrchestratorTest.*;
+import static org.mockito.ArgumentMatchers.*;
+
+
+public class LaunchDagTaskTest {
+  DagActionStore.DagAction dagAction;
+  LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
+  DagManagementStateStore dagManagementStateStore;
+
+  @BeforeClass
+  public void setUp() throws URISyntaxException, SpecNotFoundException, 
IOException {
+    String flowGroup = "group1";
+    dagAction = new DagActionStore.DagAction(flowGroup, "name1",
+        1L, "", DagActionStore.DagActionType.LAUNCH);
+
+    MultiActiveLeaseArbiter mockedLeaseArbiter = 
Mockito.mock(MultiActiveLeaseArbiter.class);
+    
Mockito.when(mockedLeaseArbiter.recordLeaseSuccess(any())).thenReturn(true);
+
+    leaseObtainedStatus = new LeaseAttemptStatus.LeaseObtainedStatus(
+        new DagActionStore.LeaseParams(dagAction, true, 1),
+        0, 5, mockedLeaseArbiter);
+    dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+
+    FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(flowGroup);
+    FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
+    
Mockito.when(dagManagementStateStore.getFlowSpec(any())).thenReturn(adhocSpec);
+  }
+
+  /*
+  Validate that after concluding an adhoc flow its flowSpec will be removed
+   */
+  @Test
+  public void concludeRemovesAdhocFlowSpec() throws IOException {
+    LaunchDagTask dagTask = new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagManagementStateStore);
+    dagTask.conclude();
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(any());
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).removeFlowSpec(any(), any(), anyBoolean());
+  }
+}

Reply via email to