khandelwal-prateek commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1844070218


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time
+   * in leaseArbiterStore
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   */

Review Comment:
   Please add `@return` in javadocs 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time
+   * in leaseArbiterStore
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   */
+  boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams)

Review Comment:
   consider renaming this to `isLeaseAcquirable` for conciseness and to be 
consistent with other method names



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+    if (!flowSpec.isScheduled()) {
+      Config flowConfig = flowSpec.getConfig();
+      String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+      try {
+        if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) {
+          throw new LeaseUnavailableException("Lease already occupied by 
another execution of this flow");

Review Comment:
   add an info log here with `flowGroup`, `flowName`.. it would be useful in 
debugging



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  @Override
+  public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams 
leaseParams) throws IOException {

Review Comment:
   Please add javadoc.. something like:
   `Determines if a lease can be acquired for the given flow. A lease is 
acquirable if ...`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException 
{
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+  }
+
+  @Test
+  public void testOnAddSpec_withFlowSpec_Available() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();

Review Comment:
   can we use `this.flowSpec` here since it already has the schedule?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException 
{

Review Comment:
   let's also test the scenario when `canAcquireLeaseOnEntity` returns `true` 
for adhoc flow since the other test is for scheduled flow



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {

Review Comment:
   add javadoc



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException 
{
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+  }
+
+  @Test
+  public void testOnAddSpec_withFlowSpec_Available() throws IOException {

Review Comment:
   use camelcase for naming methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to