[ 
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=943962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-943962
 ]

ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Nov/24 17:13
            Start Date: 15/Nov/24 17:13
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1844070218


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time
+   * in leaseArbiterStore
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   */

Review Comment:
   Please add `@return` in javadocs 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time
+   * in leaseArbiterStore
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   */
+  boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams)

Review Comment:
   consider renaming this to `isLeaseAcquirable` for conciseness and to be 
consistent with other method names



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+    if (!flowSpec.isScheduled()) {
+      Config flowConfig = flowSpec.getConfig();
+      String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+      try {
+        if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) {
+          throw new LeaseUnavailableException("Lease already occupied by 
another execution of this flow");

Review Comment:
   add an info log here with `flowGroup`, `flowName`.. it would be useful in 
debugging



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  @Override
+  public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams 
leaseParams) throws IOException {

Review Comment:
   Please add javadoc.. something like:
   `Determines if a lease can be acquired for the given flow. A lease is 
acquirable if ...`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException 
{
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+  }
+
+  @Test
+  public void testOnAddSpec_withFlowSpec_Available() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();

Review Comment:
   can we use `this.flowSpec` here since it already has the schedule?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException 
{

Review Comment:
   let's also test the scenario when `canAcquireLeaseOnEntity` returns `true` 
for adhoc flow since the other test is for scheduled flow



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {

Review Comment:
   add javadoc



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException 
{
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+  }
+
+  @Test
+  public void testOnAddSpec_withFlowSpec_Available() throws IOException {

Review Comment:
   use camelcase for naming methods





Issue Time Tracking
-------------------

            Worklog Id:     (was: 943962)
    Remaining Estimate: 0h
            Time Spent: 10m

> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
>                 Key: GOBBLIN-2173
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Abhishek Jain
>            Assignee: Abhishek Tiwari
>            Priority: Critical
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to 
> persist them in service restart/failover scenarios. However, it is expected 
> that once these flows are kicked off/ forwarded to the DagProcEngine, they 
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge 
> case(s) where they are persisted in the db. This can be fatal for users such 
> as DIL that run adhoc flows using the same flowgroup/flowname consistently, 
> which will lead to their flows being stuck. We need to find which edge cases 
> are not handling the flow spec deletion properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to