[ 
https://issues.apache.org/jira/browse/GOBBLIN-2062?focusedWorklogId=917928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-917928
 ]

ASF GitHub Bot logged work on GOBBLIN-2062:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/24 20:28
            Start Date: 06/May/24 20:28
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1591493291


##########
gobblin-restli/server.gradle:
##########
@@ -45,6 +45,7 @@ dependencies {
   }
 
   compile externalDependency.gson
+  compile externalDependency.lombok

Review Comment:
   unclear how this was missed... but not having meant MANY compilation warnings



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -81,12 +84,15 @@ public class OrchestratorTest {
 
   private FlowCatalog flowCatalog;
   private FlowSpec flowSpec;
-  private Orchestrator orchestrator;
+
+  private FlowStatusGenerator mockFlowStatusGenerator;
+  private DagManager mockDagManager;
+  private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_TABLE = "quotas";
 
-  @BeforeClass
+  @BeforeMethod

Review Comment:
   unfortunately lots of "testing smells" here.
   
   first off, class-level test init doesn't play well w/ verifying mock 
interactions, since we want a fresh count for the exec of each test method.  
relatedly it also complicates - if not outright foreclosing on - parallel test 
method execution.
   
   secondly, the `@Test(dependsOnMethod = ...)` structuring is a testing 
anti-pattern that here obscured that `setup`-style init had been formulated 
instead as a test of it's own - `createTopologySpec()`.  a major clue of 
something wrong is that particular "@Test" method does not even exercise 
`Orchestrator`, our class-under-test!
   
   for now, I've fixed this suite to use per-method setup/teardown best 
practices, but left it as a TODO to figure out where `createTopologySpec` 
actually belongs.  `createFlowSpec` and `deleteFlowSpec` also deserve attention 
in this same regard.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -223,6 +223,8 @@ public static GobblinServiceManager 
createTestGobblinServiceManager(Properties s
 
     DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
     doNothing().when(spiedDagManager).setActive(anyBoolean());
+    // WARNING: this `spiedDagManager` WILL NOT BE the one used by the 
`Orchestrator`: its DM has apparently already been
+    // provided to the `Orchestrator` ctor, prior to this replacement here of 
`GobblinServiceManager.dagManager`

Review Comment:
   documenting for posterity, since unfortunately, this was quite troublesome 
to debug



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java:
##########
@@ -57,6 +58,8 @@ public static class NoLongerLeasingStatus extends 
LeaseAttemptStatus {}
   current LeaseObtainedStatus via the completeLease method from a caller 
without access to the {@link MultiActiveLeaseArbiter}.
   */
   @Data
+  // avoid - warning: Generating equals/hashCode implementation but without a 
call to superclass, even though this class does not extend java.lang.Object

Review Comment:
   another preventable source of compilation warnings



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -229,33 +229,39 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
-        TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
-        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-        Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-            
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                flowName, flowMetadata);
-
-        if (!compiledDagOptional.isPresent()) {
-          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-          return;
-        }
-        Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-        if (compiledDag.isEmpty()) {
-          
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec, flowMetadata);
-          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+        try {
+          TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+          Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+              
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
+                  flowName, flowMetadata);
+
+          if (!compiledDagOptional.isPresent()) {
+            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+            return;
+          }
+          Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+          if (compiledDag.isEmpty()) {
+            
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec,
+                flowMetadata);
+            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+            
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+                SharedFlowMetricsSingleton.CompiledState.FAILED);
+            _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
+            return;
+          }
           
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-              SharedFlowMetricsSingleton.CompiledState.FAILED);
-          _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
-          return;
-        }
-        sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-            SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+              SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
 
-        
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-        flowCompilationTimer.stop(flowMetadata);
+          
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
+          flowCompilationTimer.stop(flowMetadata);
 
-        // Depending on if DagManager is present, handle execution
-        submitFlowToDagManager(flowSpec, compiledDag);
+          // Depending on if DagManager is present, handle execution
+          submitFlowToDagManager(flowSpec, compiledDag);
+        } finally {
+          // remove from the flow catalog, regardless of whether the flow was 
successfully validated and permitted to exec (concurrently)
+          this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+        }

Review Comment:
   too bad the diff above doesn't clearly indicate it was solely an indentation 
change to add the `try ... finally` here.  the purpose of which is to ensure 
FlowCatalog cleanup, come what may



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
-    this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
-        Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
-        new FlowCompilationValidationHelper(config, 
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
-    this.topologyCatalog.addListener(orchestrator);
-    this.flowCatalog.addListener(orchestrator);
+    FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, 
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),

Review Comment:
   renamed to avoid confusion, as I now pass `Optional.absent()` for the 
`FlowLaunchHandler`.  nothing about the previously-existing tests below 
strictly validated that code path, so I made this entire `OrchestratorTest` 
class specific to the legacy `DagManager` version.
   
   as for the lack of validation for the other code path, that suggests a 
missing test: e.g. that `orchestrate` invokes 
`FlowLaunchHandler::handleFlowLaunchTriggerEvent`
   
   cc: @umustafi and @arjun4084346 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 917928)
    Time Spent: 20m  (was: 10m)

> adhoc flow failure due to concurrent execs must be removed from flow catalog
> ----------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2062
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2062
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> the Orchestrator + DagManager MUST remove adhoc flows that violate concurrent 
> execs from the flow catalog.  otherwise gaas will continue to return '409 
> Conflict' to each subsequent attempt to create an adhoc flow with the same 
> flowGroup+flowName.  this is despite the fact that the flow (which still 
> remains in the FlowCatalog, when it shouldn't be) already has the status 
> FAILED, which is a "final status".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to