[ https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944162&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944162 ]
ASF GitHub Bot logged work on GOBBLIN-2173: ------------------------------------------- Author: ASF GitHub Bot Created on: 18/Nov/24 18:32 Start Date: 18/Nov/24 18:32 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1846985471 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -103,6 +103,13 @@ public interface DagManagementStateStore { */ void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException; + /** + * Returns true if lease can be acquired on entity provided in leaseParams. + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, + * and if the dag action event we're checking on is a reminder event + */ + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; Review Comment: DMSS has no concept of leasing, since that's meant to be a lower-level impl detail. accordingly let's avoid `LeaseParams` in this interface. given we already have this method: ``` boolean existsFlowDagAction(String flowGroup, String flowName, long flowExecutionId, DagActionStore.DagActionType dagActionType) throws IOException, SQLException; ``` how about this new one: ``` boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException, SQLException; ``` ? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +/** + * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. + */ +public class LeaseUnavailableException extends RuntimeException { Review Comment: this name is misleading in our current context, where nobody even tried to acquire any lease (**that case** is anyway already represented by `LeaseAttemptStatus.LeasedToAnotherStatus`). how about `WouldNotBeLeasableException` or `TooSoonToRerunSameFlowException`? I prefer the latter, which clearly characterizes a restriction on using the API, whereas the former suggests impl. details, w/o specifically naming the problem (e.g. why not leasable?). ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java: ########## @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testWhenLeasableEntityUnavailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); + } + + /* + test to verify if leasable entity exists post epsilon time + */ + @Test + public void testWhenLeasableEntityAvailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams4); + Thread.sleep(MORE_THAN_EPSILON); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4)); Review Comment: same here ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -125,6 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { _log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec); this.specCompiler.onAddSpec(addedSpec); } else if (addedSpec instanceof FlowSpec) { + validateAdhocFlowLeasability((FlowSpec) addedSpec); Review Comment: nit: "validate"/"verify" are good for methods returning a boolean. the entire purpose of this `void` method is to throw an exception. clearly indicate that with stronger naming, like "failIf..." or "enforce..." ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore, else returns false + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + * @return true if lease can be acquired on the flow passed in the lease params, false otherwise + */ + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) Review Comment: and apologies that I probably wasn't explaining clearly when earlier suggesting names like `existsLeasableEntity` (to mean that "**another one already** exists, historically") ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore, else returns false + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + * @return true if lease can be acquired on the flow passed in the lease params, false otherwise + */ + boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) Review Comment: the method name itself suggests a pre-check capability (e.g. first check whether it's acquirable and if so, then `tryAcquireLease`... being assured of success). of course, because check-then-act patterns are susceptible to race conditions, we'd never actually provide such an API. let's not confuse anyone! how about `boolean existsSimilarLeaseWithinConsolidationPeriod(LeaseParams)`? (or `existsEquivalentLeaseWithinConsolidationPeriod`) ########## gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java: ########## @@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch (Throwable e) { + } catch(LeaseUnavailableException e){ + throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); + } + catch (Throwable e) { Review Comment: formatting ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java: ########## @@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) { return true; } + @Test + public void testcanAcquireLeaseOnEntity() throws Exception{ Review Comment: camel case typo... (but anyway, `canAcquireLeaseOnEntity` is not the name of the method) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time Review Comment: very reasonable method-level javadoc... but it turns out `epsilon` is not mentioned anywhere in class-level javadoc, so this method description lacks context. so, please add the class-level info. mentioning the name 'epsilon' is fine, but definitely also give it a more specific name, like "Lease Consolidation Period". ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java: ########## @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testWhenLeasableEntityUnavailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3)); Review Comment: the whole idea is that a "similar" (but NOT same) lease isn't itself already within epsilon. hence, be sure to test `LeaseParams` that were NOT given to `tryAcquireLease` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java: ########## @@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) { } } + /* + Determines if a lease can be acquired for the given flow. A lease is acquirable if + no existing lease record exists in arbiter table or the record is older then epsilon time + */ + @Override + public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException { + Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams); + return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; Review Comment: idiomatic: ``` return infoResult.map(result -> !result.isWithinEpsilon()).getOrElse(true); ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + /* + validates if lease can be acquired on the provided flowSpec, + else throw LeaseUnavailableException + */ + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); + try { + if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) { + throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); + } + } catch (IOException exception) { + _log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception); Review Comment: we called `dagManagementStateStore.isLeaseAcquirable(leaseParams)`... who said anything about "leaseArbiterTable"? :) (anyway, the table's name is dynamically set in config). instead: ``` _log.error("unable to check whether lease acquirable " + leaseParams, ex); ``` (also on the line below) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + /* + validates if lease can be acquired on the provided flowSpec, + else throw LeaseUnavailableException + */ + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); Review Comment: keep it brief! (we just made improvements in that vein https://github.com/apache/gobblin/pull/4074 ) maybe: ``` _log.info("checking adhoc lease acquirability {}" + leaseParams); ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +/** + * An {@link RuntimeException} thrown when lease cannot be acquired on provided entity. + */ +public class LeaseUnavailableException extends RuntimeException { + public LeaseUnavailableException(String message) { Review Comment: beyond clearly naming for callers, impl-wise, this definitely relates to a flow, so that should be a ctor param. consider whether to allow a catcher to reach in to access the details as instance member(s) or merely to use internally in the ctor, to contextualize the `message` passed along to `super`. ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -116,9 +124,10 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - + MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); + this.dagManagementStateStore=dagManagementStateStore; Review Comment: spaces around `=` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java: ########## @@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) { return true; } + @Test + public void testcanAcquireLeaseOnEntity() throws Exception{ + Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); + Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams)); Review Comment: where's the test to exercise `false` path? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,60 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + /* + If no other flow has acquired lease within the epsilon time, then flow + compilation and addition to the store occurs normally + */ + @Test + public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + + /* + For Scheduled flow lease acquirable check does not occur, + and flow compilation occurs successfully + */ + @Test + public void onAddSpecForScheduledFlow() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + AddSpecResponse response = new AddSpecResponse<>(new Object()); + Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); Review Comment: don't you also need to add this mock: ``` Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); ``` (w/ the expectation it would never be called)? alternatively, verify that mock was never invoked ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -86,6 +92,8 @@ public class OrchestratorTest { private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; Review Comment: seems misnomer, as we no longer have any `Orchestrator` capable of using the DagMgr (now completely removed) instead of `FlowLaunchHandler`. suggest to rename merely to `orchestrator` Issue Time Tracking ------------------- Worklog Id: (was: 944162) Time Spent: 1h 10m (was: 1h) > Adhoc flows are not being deleted from GaaS FlowSpec store > ---------------------------------------------------------- > > Key: GOBBLIN-2173 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2173 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-service > Reporter: Abhishek Jain > Assignee: Abhishek Tiwari > Priority: Critical > Time Spent: 1h 10m > Remaining Estimate: 0h > > In GaaS, we store adhoc flows temporarily in our flowspec DB in order to > persist them in service restart/failover scenarios. However, it is expected > that once these flows are kicked off/ forwarded to the DagProcEngine, they > need to be removed from our flowspec db. > This is currently not consistently happening, there seems to be some edge > case(s) where they are persisted in the db. This can be fatal for users such > as DIL that run adhoc flows using the same flowgroup/flowname consistently, > which will lead to their flows being stuck. We need to find which edge cases > are not handling the flow spec deletion properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)