[ https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944297&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944297 ]
ASF GitHub Bot logged work on GOBBLIN-2173: ------------------------------------------- Author: ASF GitHub Bot Created on: 19/Nov/24 09:55 Start Date: 19/Nov/24 09:55 Worklog Time Spent: 10m Work Description: vsinghal85 commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848021734 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } /* - validates if lease can be acquired on the provided flowSpec, - else throw LeaseUnavailableException + enforces that a similar flow is not launching, + else throw TooSoonToRerunSameFlowException */ - private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) { if (!flowSpec.isScheduled()) { Config flowConfig = flowSpec.getConfig(); String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); - DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, - FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); - _log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams); + _log.info("checking existing adhoc flow existence for " + flowGroup + "." + flowName); Review Comment: Apologies for the typo, updated to "existing adhoc flow entry" ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java: ########## @@ -105,10 +105,13 @@ public interface DagManagementStateStore { /** * Returns true if lease can be acquired on entity provided in leaseParams. - * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered, - * and if the dag action event we're checking on is a reminder event + * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @throws IOException */ - boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException; + boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException; Review Comment: Sure updated Issue Time Tracking ------------------- Worklog Id: (was: 944297) Time Spent: 3h 50m (was: 3h 40m) > Adhoc flows are not being deleted from GaaS FlowSpec store > ---------------------------------------------------------- > > Key: GOBBLIN-2173 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2173 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-service > Reporter: Abhishek Jain > Assignee: Abhishek Tiwari > Priority: Critical > Time Spent: 3h 50m > Remaining Estimate: 0h > > In GaaS, we store adhoc flows temporarily in our flowspec DB in order to > persist them in service restart/failover scenarios. However, it is expected > that once these flows are kicked off/ forwarded to the DagProcEngine, they > need to be removed from our flowspec db. > This is currently not consistently happening, there seems to be some edge > case(s) where they are persisted in the db. This can be fatal for users such > as DIL that run adhoc flows using the same flowgroup/flowname consistently, > which will lead to their flows being stuck. We need to find which edge cases > are not handling the flow spec deletion properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)