[ 
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944391
 ]

ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Nov/24 15:48
            Start Date: 19/Nov/24 15:48
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848535757


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -394,8 +394,8 @@ private Map<String, AddSpecResponse> 
updateOrAddSpecHelper(Spec spec, boolean tr
       if (!response.getValue().getFailures().isEmpty()) {
         for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry : response.getValue().getFailures().entrySet()) {
           Throwable error = entry.getValue().getError();
-          if (error instanceof TooSoonToRerunSameFlowException) {
-            throw (TooSoonToRerunSameFlowException) error;
+          if (error instanceof RuntimeException && error.getCause() instanceof 
TooSoonToRerunSameFlowException) {
+            throw (TooSoonToRerunSameFlowException) error.getCause();

Review Comment:
   the cast isn't necessary (the reason I suggested wrapping `TooSoonToR...` 
was to enable uniform, type-agnostic code here.)
   
   for a method with the signature `throws Throwable`, aren't these two 
equivalent?
   ```
   throw error.getCause();
   throw (T) error.getCause();
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  /*
+    Determines if a lease can be acquired for the given flow. A lease is 
acquirable if
+    no existing lease record exists in arbiter table or the record is older 
then epsilon time
+   */

Review Comment:
   probably no need for this comment here in the impl, but if you want one, 
bring it into line w/ the orig from the interface



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -24,6 +24,7 @@
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;

Review Comment:
   this import belongs a few lines down w/ other apache gobblin pkgs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  /*
+    enforces that a similar adhoc flow is not launching,
+    else throw TooSoonToRerunSameFlowException

Review Comment:
   nit: `{@link TooSoonToRerunSameFlowException}`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -324,10 +323,10 @@ public void createFlowSpec() throws Throwable {
 
   /*
      If another flow has already acquired lease for this flowspec details 
within
-     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
-     and throw LeaseUnavailableException
+     lease consolidation time, then we do not execute this flow, hence do not 
process and store the spec
+     and throw RuntimeException
    */
-  @Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
+  @Test(expectedExceptions = RuntimeException.class)

Review Comment:
   after signing off, I realized my literal advice would compromise clarity, 
foul up tests, etc. (in this very way)
   
   sorry for that half-baked advice... try this instead:
   ```
   public static class TooSoonToRerunSameFlowException extends RuntimeException 
{
     @Getter private final FlowSpec flowSpec;
   
     /**
      * Account for unwrapping within @{link 
FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for 
`SpecCatalogListener`s
      * @return `TooSoonToRerunSameFlowException` wrapped in another 
`TooSoonToRerunSameFlowException
      */
     public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec 
flowSpec) {
       return new TooSoonToRerunSameFlowException(flowSpec, new 
TooSoonToRerunSameFlowException(flowSpec));
     }
   
     public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
       this(flowSpec, null);
     }
   
     /** restricted-access ctor: use {@link #wrappedOnce(String)} instead */
     private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable 
cause) {
       super("Lease already occupied by another recent execution of this flow: 
" + flowSpec, cause);
       this.flowSpec = flowSpec;
     }
   }
   ```
   
   then replace:
   ```
   throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));
   ```
   with
   ```
   throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 944391)
    Time Spent: 5h  (was: 4h 50m)

> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
>                 Key: GOBBLIN-2173
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Abhishek Jain
>            Assignee: Abhishek Tiwari
>            Priority: Critical
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to 
> persist them in service restart/failover scenarios. However, it is expected 
> that once these flows are kicked off/ forwarded to the DagProcEngine, they 
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge 
> case(s) where they are persisted in the db. This can be fatal for users such 
> as DIL that run adhoc flows using the same flowgroup/flowname consistently, 
> which will lead to their flows being stuck. We need to find which edge cases 
> are not handling the flow spec deletion properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to