phet commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847701708
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java: ########## @@ -20,8 +20,15 @@ /** * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. */ -public class LeaseUnavailableException extends RuntimeException { - public LeaseUnavailableException(String message) { +public class TooSoonToRerunSameFlowException extends RuntimeException { + private final FlowSpec flowSpec; + + public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) { super(message); + this.flowSpec = flowSpec; + } + + public FlowSpec getFlowSpec() { + return flowSpec; } Review Comment: instead use `@Getter` annotation (from `lombok`) ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java: ########## @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; Review Comment: gobblin's own imports belong farther down, around L55 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -63,13 +63,13 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole /** * This method checks if lease can be acquired on provided flow in lease params - * returns true if entry for the same flow does not exists within epsilon time + * returns true if entry for the same flow does not exists within Lease Consolidation Period Review Comment: sense is reversed here... maybe: > Check whether the same flowGroup+flowName is within the Lease Consolidation Period (aka. epsilon) from other, unrelated leasing activity this is also out-of-date: ``` @return true if lease can be acquired on the flow passed in the lease params, false otherwise ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -105,10 +105,13 @@ public interface DagManagementStateStore { /** * Returns true if lease can be acquired on entity provided in leaseParams. - * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, - * and if the dag action event we're checking on is a reminder event + * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action Review Comment: javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction also, out-of-date: ``` Returns true if lease can be acquired on entity provided in leaseParams. ``` ########## gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java: ########## @@ -257,10 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch(LeaseUnavailableException e){ - throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); - } - catch (Throwable e) { + } catch(TooSoonToRerunSameFlowException e) { + return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, + "FlowSpec with URI " + flowSpec.getUri() + " was launched within the lease consolidation period, no action will be taken")); Review Comment: nit: "was **previously** launched within" ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } /* - validates if lease can be acquired on the provided flowSpec, - else throw LeaseUnavailableException + enforces that a similar flow is not launching, + else throw TooSoonToRerunSameFlowException */ - private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) { Review Comment: this doesn't enforce the existence, but rather the **non-**existence. also, "similar lease" generally sounds apt, if we consider "similar" to be **the same** FlowId, but different executionId. hence the lease is similar, while "the flow" is actually... **the same**. this line of reasoning leaves "similar flow" sounding imprecise at best and confusing at worst. I regret suggesting it and apologize for that. (it stands out more clearly when I'm solely reading vs. struggling to originate a name myself.) really, we're talking about recent execution of the same FlowId (aka. flowGroup+flowName). maybe `enforceNoRecentAdhocExecOfSameFlow`? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -105,10 +105,13 @@ public interface DagManagementStateStore { /** * Returns true if lease can be acquired on entity provided in leaseParams. - * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, - * and if the dag action event we're checking on is a reminder event + * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @throws IOException */ - boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; + boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException; Review Comment: how about `existsCurrentlyLaunchingExecOfSameFlow`? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java: ########## @@ -392,7 +393,12 @@ private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec, boolean tr // If flow fails compilation, the result will have a non-empty string with the error if (!response.getValue().getFailures().isEmpty()) { for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) { - throw entry.getValue().getError().getCause(); + Throwable error = entry.getValue().getError(); + if (error instanceof TooSoonToRerunSameFlowException) { + throw (TooSoonToRerunSameFlowException) error; + } else { + throw error.getCause(); + } Review Comment: I'm not crazy about having to explicitly carve out a special case for this exception. couldn't we instead, when throwing it in the first place, wrap it in an extra `RuntimeException` that we know will be stripped off here? (if doing that, be sure to add a comment explaining it's for the `SpecCatalogListener` `CallbackResult` handling over here.) ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java: ########## @@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest { new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction3_similar = + new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams3_similar = new DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction4_similar = + new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams4_similar = new DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis); Review Comment: nit: so we can easily surmise what's different between `launchDA3` and `launchDA3_similar`, please put the 3s and the 4s next to each other. also, the two-step init is very clunky with an extra name plus one more line of boiler-plate. in cases where the `DagAction` is merely used to init `LeaseParams`, skip creating a separate name for the `DagAction` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java: ########## @@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String flowName = "testFlowName"; private static final String jobName = "testJobName"; private static final long flowExecutionId = 12345677L; + private static final long flowExecutionId1 = 12345996L; Review Comment: I never noticed before that `flowExecutionId`, which is customarily millis-since-epoch only has 7 digits when it should have 10. let's fix that and also define this as: ``` private static final long flowExecutionIdAlt = flowExecutionId + ...; // whatever you consider a reasonable (later) offset ``` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java: ########## @@ -96,13 +95,19 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) { } @Test - public void testcanAcquireLeaseOnEntity() throws Exception{ - Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws Exception{ + Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); String flowName = "testFlow"; String flowGroup = "testGroup"; - DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); - Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams)); + Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class))); Review Comment: apologies, but I'm not actually familiar w/ what `any(Long.class)` means in a context like this. I'm familiar w/ such arg matchers setting-up mocking and also to verify prior invocations of a mock - but not within an actual invocation of a non-mock. guessing: does it choose a random long and pass that as the arg? it might be better to just pass an actual value, such as `System.currentTimeMillis()` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -370,16 +376,18 @@ public void onAddSpecForScheduledFlow() throws IOException { FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); AddSpecResponse response = new AddSpecResponse<>(new Object()); Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); - AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec); Assert.assertNotNull(addSpecResponse); + // Verifying that for scheduled flow isLeaseAcquirable is not called + Mockito.verify(dagManagementStateStore, Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(), anyLong()); Review Comment: comment names wrong method also, use `Mockito.never()` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } /* - validates if lease can be acquired on the provided flowSpec, - else throw LeaseUnavailableException + enforces that a similar flow is not launching, + else throw TooSoonToRerunSameFlowException */ - private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, - FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); - _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); + _log.info("checking existing adhoc flow existence for " + flowGroup + "." + flowName); Review Comment: "existing adhoc flow existence".... did you mean "existing adhoc flow execution"? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } /* - validates if lease can be acquired on the provided flowSpec, - else throw LeaseUnavailableException + enforces that a similar flow is not launching, + else throw TooSoonToRerunSameFlowException */ - private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, - FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); - _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); + _log.info("checking existing adhoc flow existence for " + flowGroup + "." + flowName); try { - if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) { - throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); + if (dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) { + throw new TooSoonToRerunSameFlowException("Lease already occupied by another execution of this flow", flowSpec); Review Comment: 1. we have an `.info` line above announcing the check, so let's follow w/ a `.warn` line here when the check fails. suggest: "another recent adhoc flow exec found for...." 2. exception msg could benefit from minor improvements, yet--however it's phrased--it belongs encapsulated in the `TooSoonToRerun...` ctor, which should take solely a `FlowSpec` param -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org