phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847701708


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java:
##########
@@ -20,8 +20,15 @@
 /**
  * An {@link RuntimeException} thrown when lease cannot be acquired on 
provided entity.
  */
-public class LeaseUnavailableException extends RuntimeException {
-  public LeaseUnavailableException(String message) {
+public class TooSoonToRerunSameFlowException extends RuntimeException {
+  private final FlowSpec flowSpec;
+
+  public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) {
     super(message);
+    this.flowSpec = flowSpec;
+  }
+
+  public FlowSpec getFlowSpec() {
+    return flowSpec;
   }

Review Comment:
   instead use `@Getter` annotation (from `lombok`)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -29,6 +29,7 @@
 import java.util.Properties;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;

Review Comment:
   gobblin's own imports belong farther down, around L55



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -63,13 +63,13 @@ LeaseAttemptStatus 
tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole
 
   /**
    * This method checks if lease can be acquired on provided flow in lease 
params
-   * returns true if entry for the same flow does not exists within epsilon 
time
+   * returns true if entry for the same flow does not exists within Lease 
Consolidation Period

Review Comment:
   sense is reversed here...
   
   maybe:
   > Check whether the same flowGroup+flowName is within the Lease 
Consolidation Period (aka. epsilon) from other, unrelated leasing activity
   
   this is also out-of-date:
   ```
   @return true if lease can be acquired on the flow passed in the lease 
params, false otherwise
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
 
   /**
    * Returns true if lease can be acquired on entity provided in leaseParams.
-   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action was triggered,
-   *                      and if the dag action event we're checking on is a 
reminder event
+   * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action

Review Comment:
   javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction
   
   also, out-of-date:
   ```
   Returns true if lease can be acquired on entity provided in leaseParams.
   ```



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -257,10 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, 
FlowStatusId>, FlowConfig> cr
       responseMap = this.flowCatalog.put(flowSpec, true);
     } catch (QuotaExceededException e) {
       throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, 
e.getMessage());
-    } catch(LeaseUnavailableException e){
-      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
e.getMessage());
-    }
-    catch (Throwable e) {
+    } catch(TooSoonToRerunSameFlowException e) {
+      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_409_CONFLICT,
+          "FlowSpec with URI " + flowSpec.getUri() + " was launched within the 
lease consolidation period, no action will be taken"));

Review Comment:
   nit: "was **previously** launched within"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
   }
 
   /*
-    validates if lease can be acquired on the provided flowSpec,
-    else throw LeaseUnavailableException
+    enforces that a similar flow is not launching,
+    else throw TooSoonToRerunSameFlowException
    */
-  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+  private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {

Review Comment:
   this doesn't enforce the existence, but rather the **non-**existence.
   
   also, "similar lease" generally sounds apt, if we consider "similar" to be 
**the same** FlowId, but different executionId.  hence the lease is similar, 
while "the flow" is actually... **the same**.
   
   this line of reasoning leaves "similar flow" sounding imprecise at best and 
confusing at worst.  I regret suggesting it and apologize for that.  (it stands 
out more clearly when I'm solely reading vs. struggling to originate a name 
myself.)
   
   really, we're talking about recent execution of the same FlowId (aka. 
flowGroup+flowName).  maybe `enforceNoRecentAdhocExecOfSameFlow`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
 
   /**
    * Returns true if lease can be acquired on entity provided in leaseParams.
-   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action was triggered,
-   *                      and if the dag action event we're checking on is a 
reminder event
+   * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @throws IOException
    */
-  boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws 
IOException;
+  boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String 
flowName, long flowExecutionId) throws IOException;

Review Comment:
   how about `existsCurrentlyLaunchingExecOfSameFlow`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -392,7 +393,12 @@ private Map<String, AddSpecResponse> 
updateOrAddSpecHelper(Spec spec, boolean tr
       // If flow fails compilation, the result will have a non-empty string 
with the error
       if (!response.getValue().getFailures().isEmpty()) {
         for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry : response.getValue().getFailures().entrySet()) {
-          throw entry.getValue().getError().getCause();
+          Throwable error = entry.getValue().getError();
+          if (error instanceof TooSoonToRerunSameFlowException) {
+            throw (TooSoonToRerunSameFlowException) error;
+          } else {
+            throw error.getCause();
+          }

Review Comment:
   I'm not crazy about having to explicitly carve out a special case for this 
exception.  couldn't we instead, when throwing it in the first place, wrap it 
in an extra `RuntimeException` that we know will be stripped off here?
   
   (if doing that, be sure to add a comment explaining it's for the 
`SpecCatalogListener` `CallbackResult` handling over here.)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
       new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
   private static final DagActionStore.LeaseParams
       launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, 
false, eventTimeMillis);
+  private static final DagActionStore.DagAction launchDagAction3_similar =
+      new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, 
jobName, DagActionStore.DagActionType.LAUNCH);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams3_similar = new 
DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis);
+  private static final DagActionStore.DagAction launchDagAction4_similar =
+      new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, 
jobName, DagActionStore.DagActionType.LAUNCH);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams4_similar = new 
DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis);

Review Comment:
   nit: so we can easily surmise what's different between `launchDA3` and 
`launchDA3_similar`, please put the 3s and the 4s next to each other.
   
   also, the two-step init is very clunky with an extra name plus one more line 
of boiler-plate.  in cases where the `DagAction` is merely used to init 
`LeaseParams`, skip creating a separate name for the `DagAction`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
   private static final long flowExecutionId = 12345677L;
+  private static final long flowExecutionId1 = 12345996L;

Review Comment:
   I never noticed before that `flowExecutionId`, which is customarily 
millis-since-epoch only has 7 digits when it should have 10.  let's fix that 
and also define this as:
   ```
   private static final long flowExecutionIdAlt = flowExecutionId + ...; // 
whatever you consider a reasonable (later) offset
   ```



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -96,13 +95,19 @@ public static <T> boolean compareLists(List<T> list1, 
List<T> list2) {
   }
 
   @Test
-  public void testcanAcquireLeaseOnEntity() throws Exception{
-    
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+  public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws 
Exception{
+    
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
     String flowName = "testFlow";
     String flowGroup = "testGroup";
-    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), 
"testJob", DagActionStore.DagActionType.LAUNCH);
-    DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction);
-    Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
+    
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup,
 flowName, any(Long.class)));

Review Comment:
   apologies, but I'm not actually familiar w/ what `any(Long.class)` means in 
a context like this.  I'm familiar w/ such arg matchers setting-up mocking and 
also to verify prior invocations of a mock - but not within an actual 
invocation of a non-mock.
   
   guessing: does it choose a random long and pass that as the arg?  it might 
be better to just pass an actual value, such as `System.currentTimeMillis()`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -370,16 +376,18 @@ public void onAddSpecForScheduledFlow() throws 
IOException {
     FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
     AddSpecResponse response = new AddSpecResponse<>(new Object());
     Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
-    AddSpecResponse addSpecResponse = 
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+    AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
     Assert.assertNotNull(addSpecResponse);
+    // Verifying that for scheduled flow isLeaseAcquirable is not called
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(), 
anyLong());

Review Comment:
   comment names wrong method
   
   also, use `Mockito.never()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
   }
 
   /*
-    validates if lease can be acquired on the provided flowSpec,
-    else throw LeaseUnavailableException
+    enforces that a similar flow is not launching,
+    else throw TooSoonToRerunSameFlowException
    */
-  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+  private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
     if (!flowSpec.isScheduled()) {
       Config flowConfig = flowSpec.getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
-      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
-          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
-      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
-      _log.info("validation of lease acquirability of adhoc flow with lease 
params: " + leaseParams);
+      _log.info("checking existing adhoc flow existence for " + flowGroup + 
"." + flowName);

Review Comment:
   "existing adhoc flow existence".... did you mean "existing adhoc flow 
execution"?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
   }
 
   /*
-    validates if lease can be acquired on the provided flowSpec,
-    else throw LeaseUnavailableException
+    enforces that a similar flow is not launching,
+    else throw TooSoonToRerunSameFlowException
    */
-  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+  private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
     if (!flowSpec.isScheduled()) {
       Config flowConfig = flowSpec.getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
-      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
-          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
-      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
-      _log.info("validation of lease acquirability of adhoc flow with lease 
params: " + leaseParams);
+      _log.info("checking existing adhoc flow existence for " + flowGroup + 
"." + flowName);
       try {
-        if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
-          throw new LeaseUnavailableException("Lease already occupied by 
another execution of this flow");
+        if 
(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, 
flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
+          throw new TooSoonToRerunSameFlowException("Lease already occupied by 
another execution of this flow", flowSpec);

Review Comment:
   1. we have an `.info` line above announcing the check, so let's follow w/ a 
`.warn` line here when the check fails.  suggest: "another recent adhoc flow 
exec found for...."
   
   2. exception msg could benefit from minor improvements, yet--however it's 
phrased--it belongs encapsulated in the `TooSoonToRerun...` ctor, which should 
take solely a `FlowSpec` param



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to