[ 
https://issues.apache.org/jira/browse/GOBBLIN-1884?focusedWorklogId=877661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-877661
 ]

ASF GitHub Bot logged work on GOBBLIN-1884:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Aug/23 23:34
            Start Date: 22/Aug/23 23:34
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3746:
URL: https://github.com/apache/gobblin/pull/3746#discussion_r1302229484


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -493,24 +494,27 @@ public synchronized void setActive(boolean active) {
    * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
    * the process.
    */
-  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
-    FlowId flowId = action.getFlowId();
-    FlowSpec spec;
+  public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
+    Preconditions.checkArgument(launchAction.getFlowActionType() == 
DagActionStore.FlowActionType.LAUNCH);
+    log.info("Handle launch flow event for action {}", launchAction);
+    FlowId flowId = launchAction.getFlowId();
     try {
-      log.info("Handle launch flow event for action: {}", action);
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
-      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
       Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
           
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
       if (optionalJobExecutionPlanDag.isPresent()) {
         addDag(optionalJobExecutionPlanDag.get(), true, true);
       }
+      // Upon handling the action, delete it so on leadership change this is 
not duplicated
+      this.dagActionStore.get().deleteDagAction(launchAction);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
     } catch (SpecNotFoundException e) {
       log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
     } catch (IOException e) {
-      log.warn("Failed to add Job Execution Plan for flowId {} due to 
exception {}", flowId, e.getMessage());
+      log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore due to "

Review Comment:
   maybe remind our maintainer (reading this log message) to `"...(check 
stacktrace) due to "`, just in case the either-or log message initially trips 
them up



##########
FlowTriggerHandlerTest.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.junit.Assert;
+import org.quartz.JobDataMap;
+import org.testng.annotations.Test;
+
+
+public class FlowTriggerHandlerTest {
+  String newCronExpression = "0 0 0 ? * * 2024";
+  long newEventToRevisit = 123L;
+  long newEventToTrigger = 456L;
+
+  /**
+   * Provides an input with all three values (cronExpression, 
reminderTimestamp, originalEventTime) set in the map
+   * Properties and checks that they are updated properly
+   */
+  @Test
+  public void testUpdatePropsInJobDataMap() {

Review Comment:
   I don't see that the `updatePropsInJobDataMap` method was even modified in 
this PR.  if that's the case, why is that the only thing these brand new tests 
cover?
   
   for instance, what about verifying the formulation of the reminder job key?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 877661)
    Time Spent: 0.5h  (was: 20m)

> Delete launch dag actions after handling them
> ---------------------------------------------
>
>                 Key: GOBBLIN-1884
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1884
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> There is a bug in which launch actions get accumulated and "stuck" in the dag 
> action store, so every time a leader is changed or the service is re-deployed 
> we will keep redoing old launch actions again and again. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to