[ https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=943962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-943962 ]
ASF GitHub Bot logged work on GOBBLIN-2173: ------------------------------------------- Author: ASF GitHub Bot Created on: 15/Nov/24 17:13 Start Date: 15/Nov/24 17:13 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1844070218 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + */ Review Comment: Please add `@return` in javadocs ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + */ + boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) Review Comment: consider renaming this to `isLeaseAcquirable` for conciseness and to be consistent with other method names ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + try { + if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) { + throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); Review Comment: add an info log here with `flowGroup`, `flowName`.. it would be useful in debugging ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java: ########## @@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) { } } + @Override + public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { Review Comment: Please add javadoc.. something like: `Determines if a lease can be acquired for the given flow. A lease is acquirable if ...` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + @Test + public void testOnAddSpec_withFlowSpec_Available() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); Review Comment: can we use `this.flowSpec` here since it already has the schedule? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { Review Comment: let's also test the scenario when `canAcquireLeaseOnEntity` returns `true` for adhoc flow since the other test is for scheduled flow ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { Review Comment: add javadoc ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + @Test + public void testOnAddSpec_withFlowSpec_Available() throws IOException { Review Comment: use camelcase for naming methods Issue Time Tracking ------------------- Worklog Id: (was: 943962) Remaining Estimate: 0h Time Spent: 10m > Adhoc flows are not being deleted from GaaS FlowSpec store > ---------------------------------------------------------- > > Key: GOBBLIN-2173 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2173 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-service > Reporter: Abhishek Jain > Assignee: Abhishek Tiwari > Priority: Critical > Time Spent: 10m > Remaining Estimate: 0h > > In GaaS, we store adhoc flows temporarily in our flowspec DB in order to > persist them in service restart/failover scenarios. However, it is expected > that once these flows are kicked off/ forwarded to the DagProcEngine, they > need to be removed from our flowspec db. > This is currently not consistently happening, there seems to be some edge > case(s) where they are persisted in the db. This can be fatal for users such > as DIL that run adhoc flows using the same flowgroup/flowname consistently, > which will lead to their flows being stuck. We need to find which edge cases > are not handling the flow spec deletion properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)