phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1846985471


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -103,6 +103,13 @@ public interface DagManagementStateStore {
    */
   void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
 
+  /**
+   * Returns true if lease can be acquired on entity provided in leaseParams.
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action was triggered,
+   *                      and if the dag action event we're checking on is a 
reminder event
+   */
+  boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws 
IOException;

Review Comment:
   DMSS has no concept of leasing, since that's meant to be a lower-level impl 
detail.  accordingly let's avoid `LeaseParams` in this interface.
   
   given we already have this method:
   ```
   boolean existsFlowDagAction(String flowGroup, String flowName, long 
flowExecutionId, DagActionStore.DagActionType dagActionType)
       throws IOException, SQLException;
   ```
   how about this new one:
   ```
   boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String 
flowName, long flowExecutionId) throws IOException, SQLException;
   ```
   ?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * An {@link RuntimeException} thrown when lease cannot be acquired on 
provided entity.
+ */
+public class LeaseUnavailableException extends RuntimeException {

Review Comment:
   this name is misleading in our current context, where nobody even tried to 
acquire any lease (**that case** is anyway already represented by 
`LeaseAttemptStatus.LeasedToAnotherStatus`).
   
   how about `WouldNotBeLeasableException` or 
`TooSoonToRerunSameFlowException`?  I prefer the latter, which clearly 
characterizes a restriction on using the API, whereas the former suggests impl. 
details, w/o specifically naming the problem (e.g. why not leasable?).



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
 
+  /*
+   test to verify if leasable entity is unavailable before epsilon time
+   to account for clock drift
+  */
+  @Test
+  public void testWhenLeasableEntityUnavailable() throws Exception{
+    LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    completeLeaseHelper(launchLeaseParams3);
+    Thread.sleep(LESS_THAN_EPSILON);
+    
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));
+  }
+
+  /*
+     test to verify if leasable entity exists post epsilon time
+   */
+  @Test
+  public void testWhenLeasableEntityAvailable() throws Exception{
+    LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true);
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    completeLeaseHelper(launchLeaseParams4);
+    Thread.sleep(MORE_THAN_EPSILON);
+    
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4));

Review Comment:
   same here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -125,6 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       _log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec);
       this.specCompiler.onAddSpec(addedSpec);
     } else if (addedSpec instanceof FlowSpec) {
+      validateAdhocFlowLeasability((FlowSpec) addedSpec);

Review Comment:
   nit: "validate"/"verify" are good for methods returning a boolean.  the 
entire purpose of this `void` method is to throw an exception.  clearly 
indicate that with stronger naming, like "failIf..." or "enforce..."



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time
+   * in leaseArbiterStore, else returns false
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   * @return true if lease can be acquired on the flow passed in the lease 
params, false otherwise
+   */
+  boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)

Review Comment:
   and apologies that I probably wasn't explaining clearly when earlier 
suggesting names like `existsLeasableEntity` (to mean that "**another one 
already** exists, historically")



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time
+   * in leaseArbiterStore, else returns false
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   * @return true if lease can be acquired on the flow passed in the lease 
params, false otherwise
+   */
+  boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)

Review Comment:
   the method name itself suggests a pre-check capability (e.g. first check 
whether it's acquirable and if so, then `tryAcquireLease`... being assured of 
success).
   
   of course, because check-then-act patterns are susceptible to race 
conditions, we'd never actually provide such an API.  let's not confuse anyone!
   
   how about `boolean existsSimilarLeaseWithinConsolidationPeriod(LeaseParams)`?
   
   (or `existsEquivalentLeaseWithinConsolidationPeriod`)
   
   



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, 
FlowStatusId>, FlowConfig> cr
       responseMap = this.flowCatalog.put(flowSpec, true);
     } catch (QuotaExceededException e) {
       throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, 
e.getMessage());
-    } catch (Throwable e) {
+    } catch(LeaseUnavailableException e){
+      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
e.getMessage());
+    }
+    catch (Throwable e) {

Review Comment:
   formatting



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, 
List<T> list2) {
     return true;
   }
 
+  @Test
+  public void testcanAcquireLeaseOnEntity() throws Exception{

Review Comment:
   camel case typo... (but anyway, `canAcquireLeaseOnEntity` is not the name of 
the method)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if lease can be acquired on provided flow in lease 
params
+   * returns true if entry for the same flow does not exists within epsilon 
time

Review Comment:
   very reasonable method-level javadoc... but it turns out `epsilon` is not 
mentioned anywhere in class-level javadoc, so this method description lacks 
context.
   
   so, please add the class-level info.  mentioning the name 'epsilon' is fine, 
but definitely also give it a more specific name, like "Lease Consolidation 
Period".



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
 
+  /*
+   test to verify if leasable entity is unavailable before epsilon time
+   to account for clock drift
+  */
+  @Test
+  public void testWhenLeasableEntityUnavailable() throws Exception{
+    LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    completeLeaseHelper(launchLeaseParams3);
+    Thread.sleep(LESS_THAN_EPSILON);
+    
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));

Review Comment:
   the whole idea is that a "similar" (but NOT same) lease isn't itself already 
within epsilon.  hence, be sure to test `LeaseParams` that were NOT given to 
`tryAcquireLease`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  /*
+    Determines if a lease can be acquired for the given flow. A lease is 
acquirable if
+    no existing lease record exists in arbiter table or the record is older 
then epsilon time
+   */
+  @Override
+  public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) 
throws IOException {
+    Optional<GetEventInfoResult> infoResult = 
getExistingEventInfo(leaseParams);
+    return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true;

Review Comment:
   idiomatic:
   ```
   return infoResult.map(result -> !result.isWithinEpsilon()).getOrElse(true);
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  /*
+    validates if lease can be acquired on the provided flowSpec,
+    else throw LeaseUnavailableException
+   */
+  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+    if (!flowSpec.isScheduled()) {
+      Config flowConfig = flowSpec.getConfig();
+      String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+      _log.info("validation of lease acquirability of adhoc flow with lease 
params: " + leaseParams);
+      try {
+        if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
+          throw new LeaseUnavailableException("Lease already occupied by 
another execution of this flow");
+        }
+      } catch (IOException exception) {
+        _log.error(String.format("Failed to query leaseArbiterTable for 
existing flow details: %s", flowSpec), exception);

Review Comment:
   we called `dagManagementStateStore.isLeaseAcquirable(leaseParams)`... who 
said anything about "leaseArbiterTable"? :)
   
   (anyway, the table's name is dynamically set in config).
   
   instead:
   ```
   _log.error("unable to check whether lease acquirable " + leaseParams, ex);
   ```
   
   (also on the line below)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  /*
+    validates if lease can be acquired on the provided flowSpec,
+    else throw LeaseUnavailableException
+   */
+  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+    if (!flowSpec.isScheduled()) {
+      Config flowConfig = flowSpec.getConfig();
+      String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+      _log.info("validation of lease acquirability of adhoc flow with lease 
params: " + leaseParams);

Review Comment:
   keep it brief!  (we just made improvements in that vein 
https://github.com/apache/gobblin/pull/4074 )
   
   maybe:
   ```
    _log.info("checking adhoc lease acquirability {}" + leaseParams);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * An {@link RuntimeException} thrown when lease cannot be acquired on 
provided entity.
+ */
+public class LeaseUnavailableException extends RuntimeException {
+  public LeaseUnavailableException(String message) {

Review Comment:
   beyond clearly naming for callers, impl-wise, this definitely relates to a 
flow, so that should be a ctor param.  consider whether to allow a catcher to 
reach in to access the details as instance member(s) or merely to use 
internally in the ctor, to contextualize the `message` passed along to `super`.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -116,9 +124,10 @@ public void setUp() throws Exception {
     this.flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), 
Optional.of(logger), Optional.absent(), true);
 
     this.serviceLauncher.addService(flowCatalog);
-
+    MultiActiveLeaseArbiter leaseArbiter = 
Mockito.mock(MultiActiveLeaseArbiter.class);
     MySqlDagManagementStateStore dagManagementStateStore =
         
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    this.dagManagementStateStore=dagManagementStateStore;

Review Comment:
   spaces around `=`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, 
List<T> list2) {
     return true;
   }
 
+  @Test
+  public void testcanAcquireLeaseOnEntity() throws Exception{
+    
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+    String flowName = "testFlow";
+    String flowGroup = "testGroup";
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), 
"testJob", DagActionStore.DagActionType.LAUNCH);
+    DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction);
+    Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));

Review Comment:
   where's the test to exercise `false` path?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,60 @@ public void createFlowSpec() throws Throwable {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
+     and throw LeaseUnavailableException
+   */
+  @Test(expectedExceptions = LeaseUnavailableException.class)
+  public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+  }
+
+  /*
+   If no other flow has acquired lease within the epsilon time, then flow
+   compilation and addition to the store occurs normally
+ */
+  @Test
+  public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+    AddSpecResponse addSpecResponse = 
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+    Assert.assertNotNull(addSpecResponse);
+  }
+
+  /*
+    For Scheduled flow lease acquirable check does not occur,
+    and flow compilation occurs successfully
+   */
+  @Test
+  public void onAddSpecForScheduledFlow() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    AddSpecResponse response = new AddSpecResponse<>(new Object());
+    Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);

Review Comment:
   don't you also need to add this mock:
   ```
   
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
   ```
   (w/ the expectation it would never be called)?
   
   alternatively, verify that mock was never invoked



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -86,6 +92,8 @@ public class OrchestratorTest {
   private FlowSpec flowSpec;
   private ITestMetastoreDatabase testMetastoreDatabase;
   private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;

Review Comment:
   seems misnomer, as we no longer have any `Orchestrator` capable of using the 
DagMgr (now completely removed) instead of `FlowLaunchHandler`.  suggest to 
rename merely to `orchestrator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to