phet commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1846985471
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -103,6 +103,13 @@ public interface DagManagementStateStore { */ void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException; + /** + * Returns true if lease can be acquired on entity provided in leaseParams. + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, + * and if the dag action event we're checking on is a reminder event + */ + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; Review Comment: DMSS has no concept of leasing, since that's meant to be a lower-level impl detail. accordingly let's avoid `LeaseParams` in this interface. given we already have this method: ``` boolean existsFlowDagAction(String flowGroup, String flowName, long flowExecutionId, DagActionStore.DagActionType dagActionType) throws IOException, SQLException; ``` how about this new one: ``` boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException, SQLException; ``` ? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +/** + * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. + */ +public class LeaseUnavailableException extends RuntimeException { Review Comment: this name is misleading in our current context, where nobody even tried to acquire any lease (**that case** is anyway already represented by `LeaseAttemptStatus.LeasedToAnotherStatus`). how about `WouldNotBeLeasableException` or `TooSoonToRerunSameFlowException`? I prefer the latter, which clearly characterizes a restriction on using the API, whereas the former suggests impl. details, w/o specifically naming the problem (e.g. why not leasable?). ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java: ########## @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testWhenLeasableEntityUnavailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); + } + + /* + test to verify if leasable entity exists post epsilon time + */ + @Test + public void testWhenLeasableEntityAvailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams4); + Thread.sleep(MORE_THAN_EPSILON); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4)); Review Comment: same here ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -125,6 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { _log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec); this.specCompiler.onAddSpec(addedSpec); } else if (addedSpec instanceof FlowSpec) { + validateAdhocFlowLeasability((FlowSpec) addedSpec); Review Comment: nit: "validate"/"verify" are good for methods returning a boolean. the entire purpose of this `void` method is to throw an exception. clearly indicate that with stronger naming, like "failIf..." or "enforce..." ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore, else returns false + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + * @return true if lease can be acquired on the flow passed in the lease params, false otherwise + */ + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) Review Comment: and apologies that I probably wasn't explaining clearly when earlier suggesting names like `existsLeasableEntity` (to mean that "**another one already** exists, historically") ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore, else returns false + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + * @return true if lease can be acquired on the flow passed in the lease params, false otherwise + */ + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) Review Comment: the method name itself suggests a pre-check capability (e.g. first check whether it's acquirable and if so, then `tryAcquireLease`... being assured of success). of course, because check-then-act patterns are susceptible to race conditions, we'd never actually provide such an API. let's not confuse anyone! how about `boolean existsSimilarLeaseWithinConsolidationPeriod(LeaseParams)`? (or `existsEquivalentLeaseWithinConsolidationPeriod`) ########## gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java: ########## @@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch (Throwable e) { + } catch(LeaseUnavailableException e){ + throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); + } + catch (Throwable e) { Review Comment: formatting ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java: ########## @@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) { return true; } + @Test + public void testcanAcquireLeaseOnEntity() throws Exception{ Review Comment: camel case typo... (but anyway, `canAcquireLeaseOnEntity` is not the name of the method) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time Review Comment: very reasonable method-level javadoc... but it turns out `epsilon` is not mentioned anywhere in class-level javadoc, so this method description lacks context. so, please add the class-level info. mentioning the name 'epsilon' is fine, but definitely also give it a more specific name, like "Lease Consolidation Period". ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java: ########## @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testWhenLeasableEntityUnavailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); Review Comment: the whole idea is that a "similar" (but NOT same) lease isn't itself already within epsilon. hence, be sure to test `LeaseParams` that were NOT given to `tryAcquireLease` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java: ########## @@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) { } } + /* + Determines if a lease can be acquired for the given flow. A lease is acquirable if + no existing lease record exists in arbiter table or the record is older then epsilon time + */ + @Override + public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { + Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams); + return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; Review Comment: idiomatic: ``` return infoResult.map(result -> !result.isWithinEpsilon()).getOrElse(true); ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + /* + validates if lease can be acquired on the provided flowSpec, + else throw LeaseUnavailableException + */ + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); + try { + if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) { + throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); + } + } catch (IOException exception) { + _log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception); Review Comment: we called `dagManagementStateStore.isLeaseAcquirable(leaseParams)`... who said anything about "leaseArbiterTable"? :) (anyway, the table's name is dynamically set in config). instead: ``` _log.error("unable to check whether lease acquirable " + leaseParams, ex); ``` (also on the line below) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + /* + validates if lease can be acquired on the provided flowSpec, + else throw LeaseUnavailableException + */ + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); Review Comment: keep it brief! (we just made improvements in that vein https://github.com/apache/gobblin/pull/4074 ) maybe: ``` _log.info("checking adhoc lease acquirability {}" + leaseParams); ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +/** + * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. + */ +public class LeaseUnavailableException extends RuntimeException { + public LeaseUnavailableException(String message) { Review Comment: beyond clearly naming for callers, impl-wise, this definitely relates to a flow, so that should be a ctor param. consider whether to allow a catcher to reach in to access the details as instance member(s) or merely to use internally in the ctor, to contextualize the `message` passed along to `super`. ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -116,9 +124,10 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - + MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); + this.dagManagementStateStore=dagManagementStateStore; Review Comment: spaces around `=` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java: ########## @@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) { return true; } + @Test + public void testcanAcquireLeaseOnEntity() throws Exception{ + Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); + Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams)); Review Comment: where's the test to exercise `false` path? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,60 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + /* + If no other flow has acquired lease within the epsilon time, then flow + compilation and addition to the store occurs normally + */ + @Test + public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + + /* + For Scheduled flow lease acquirable check does not occur, + and flow compilation occurs successfully + */ + @Test + public void onAddSpecForScheduledFlow() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + AddSpecResponse response = new AddSpecResponse<>(new Object()); + Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); Review Comment: don't you also need to add this mock: ``` Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); ``` (w/ the expectation it would never be called)? alternatively, verify that mock was never invoked ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -86,6 +92,8 @@ public class OrchestratorTest { private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; Review Comment: seems misnomer, as we no longer have any `Orchestrator` capable of using the DagMgr (now completely removed) instead of `FlowLaunchHandler`. suggest to rename merely to `orchestrator` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org