[ https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944391 ]
ASF GitHub Bot logged work on GOBBLIN-2173: ------------------------------------------- Author: ASF GitHub Bot Created on: 19/Nov/24 15:48 Start Date: 19/Nov/24 15:48 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848535757 ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java: ########## @@ -394,8 +394,8 @@ private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec, boolean tr if (!response.getValue().getFailures().isEmpty()) { for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) { Throwable error = entry.getValue().getError(); - if (error instanceof TooSoonToRerunSameFlowException) { - throw (TooSoonToRerunSameFlowException) error; + if (error instanceof RuntimeException && error.getCause() instanceof TooSoonToRerunSameFlowException) { + throw (TooSoonToRerunSameFlowException) error.getCause(); Review Comment: the cast isn't necessary (the reason I suggested wrapping `TooSoonToR...` was to enable uniform, type-agnostic code here.) for a method with the signature `throws Throwable`, aren't these two equivalent? ``` throw error.getCause(); throw (T) error.getCause(); ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java: ########## @@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) { } } + /* + Determines if a lease can be acquired for the given flow. A lease is acquirable if + no existing lease record exists in arbiter table or the record is older then epsilon time + */ Review Comment: probably no need for this comment here in the impl, but if you want one, bring it into line w/ the orig from the interface ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -24,6 +24,7 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException; Review Comment: this import belongs a few lines down w/ other apache gobblin pkgs ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + /* + enforces that a similar adhoc flow is not launching, + else throw TooSoonToRerunSameFlowException Review Comment: nit: `{@link TooSoonToRerunSameFlowException}` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -324,10 +323,10 @@ public void createFlowSpec() throws Throwable { /* If another flow has already acquired lease for this flowspec details within - epsilon time, then we do not execute this flow, hence do not process and store the spec - and throw LeaseUnavailableException + lease consolidation time, then we do not execute this flow, hence do not process and store the spec + and throw RuntimeException */ - @Test(expectedExceptions = TooSoonToRerunSameFlowException.class) + @Test(expectedExceptions = RuntimeException.class) Review Comment: after signing off, I realized my literal advice would compromise clarity, foul up tests, etc. (in this very way) sorry for that half-baked advice... try this instead: ``` public static class TooSoonToRerunSameFlowException extends RuntimeException { @Getter private final FlowSpec flowSpec; /** * Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for `SpecCatalogListener`s * @return `TooSoonToRerunSameFlowException` wrapped in another `TooSoonToRerunSameFlowException */ public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) { return new TooSoonToRerunSameFlowException(flowSpec, new TooSoonToRerunSameFlowException(flowSpec)); } public TooSoonToRerunSameFlowException(FlowSpec flowSpec) { this(flowSpec, null); } /** restricted-access ctor: use {@link #wrappedOnce(String)} instead */ private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) { super("Lease already occupied by another recent execution of this flow: " + flowSpec, cause); this.flowSpec = flowSpec; } } ``` then replace: ``` throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec)); ``` with ``` throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec); ``` Issue Time Tracking ------------------- Worklog Id: (was: 944391) Time Spent: 5h (was: 4h 50m) > Adhoc flows are not being deleted from GaaS FlowSpec store > ---------------------------------------------------------- > > Key: GOBBLIN-2173 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2173 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-service > Reporter: Abhishek Jain > Assignee: Abhishek Tiwari > Priority: Critical > Time Spent: 5h > Remaining Estimate: 0h > > In GaaS, we store adhoc flows temporarily in our flowspec DB in order to > persist them in service restart/failover scenarios. However, it is expected > that once these flows are kicked off/ forwarded to the DagProcEngine, they > need to be removed from our flowspec db. > This is currently not consistently happening, there seems to be some edge > case(s) where they are persisted in the db. This can be fatal for users such > as DIL that run adhoc flows using the same flowgroup/flowname consistently, > which will lead to their flows being stuck. We need to find which edge cases > are not handling the flow spec deletion properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)