[ 
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944212&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944212
 ]

ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Nov/24 01:13
            Start Date: 19/Nov/24 01:13
    Worklog Time Spent: 10m 
      Work Description: vsinghal85 commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847504934


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -116,9 +124,10 @@ public void setUp() throws Exception {
     this.flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), 
Optional.of(logger), Optional.absent(), true);
 
     this.serviceLauncher.addService(flowCatalog);
-
+    MultiActiveLeaseArbiter leaseArbiter = 
Mockito.mock(MultiActiveLeaseArbiter.class);
     MySqlDagManagementStateStore dagManagementStateStore =
         
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    this.dagManagementStateStore=dagManagementStateStore;

Review Comment:
   done



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,60 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+  }
+
+  /*
+   If no other flow has acquired lease within the epsilon time, then flow
+   compilation and addition to the store occurs normally
+ */
+  @Test
+  public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+    AddSpecResponse addSpecResponse = 
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+    Assert.assertNotNull(addSpecResponse);
+  }
+
+  /*
+    For Scheduled flow lease acquirable check does not occur,
+    and flow compilation occurs successfully
+   */
+  @Test
+  public void onAddSpecForScheduledFlow() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    AddSpecResponse response = new AddSpecResponse<>(new Object());
+    Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);

Review Comment:
   added required verify statement





Issue Time Tracking
-------------------

    Worklog Id:     (was: 944212)
    Time Spent: 2h 20m  (was: 2h 10m)

> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
>                 Key: GOBBLIN-2173
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Abhishek Jain
>            Assignee: Abhishek Tiwari
>            Priority: Critical
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to 
> persist them in service restart/failover scenarios. However, it is expected 
> that once these flows are kicked off/ forwarded to the DagProcEngine, they 
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge 
> case(s) where they are persisted in the db. This can be fatal for users such 
> as DIL that run adhoc flows using the same flowgroup/flowname consistently, 
> which will lead to their flows being stuck. We need to find which edge cases 
> are not handling the flow spec deletion properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to