umustafi commented on code in PR #3954:
URL: https://github.com/apache/gobblin/pull/3954#discussion_r1618162179


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -238,8 +238,8 @@ public void testStartupSequenceHandlesFailures() throws 
Exception {
     String jobName = "testJobName";
     String flowExecutionId = "12345677";
 
-    MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
-    mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
+    DagManagementStateStore dagManagementStateStore = 
mock(DagManagementStateStore.class);
+    //mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);

Review Comment:
   do we still need to add the jobDagAction through dagManagementStateStore?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -201,4 +196,77 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) 
throws IOException {
    * has any running job, false otherwise.
    */
   public boolean hasRunningJobs(DagManager.DagId dagId);
+
+  /**
+   * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param jobName job name for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  boolean existsJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
+
+  /**
+   * Check if an action exists in dagAction store by flow group, flow name, 
and flow execution id, it assumes jobName is
+   * empty ("").
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  boolean existsFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId,
+      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
+
+  /** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} 
for durability */
+  default void addDagAction(DagActionStore.DagAction dagAction) throws 
IOException {
+    addJobDagAction(
+        dagAction.getFlowGroup(),
+        dagAction.getFlowName(),
+        dagAction.getFlowExecutionId(),
+        dagAction.getJobName(),
+        dagAction.getDagActionType());
+  }
+
+  /**
+   * Persist the dag action in {@link DagActionStore} for durability
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @param jobName job name for the dag action
+   * @param dagActionType the value of the dag action
+   * @throws IOException
+   */
+  void addJobDagAction(String flowGroup, String flowName, String 
flowExecutionId, String jobName,
+      DagActionStore.DagActionType dagActionType) throws IOException;

Review Comment:
   why no implementation for this one and exists() but present for others??



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -40,16 +40,18 @@
 import org.apache.gobblin.service.FlowStatusId;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+
 
 @Slf4j
 public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends 
GobblinServiceFlowExecutionResourceHandler{
-  private DagActionStore dagActionStore;
+  private DagManagementStateStore dagManagementStateStore;
   @Inject
   public 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler
 handler,
       @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
-      Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) 
boolean forceLeader, DagActionStore dagActionStore) {
+      Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) 
boolean forceLeader, DagManagementStateStore dagManagementStateStore) {
     super(handler, eventBus, manager, forceLeader);

Review Comment:
   do we want to always encapsulate the dagAction additional/removal/checking 
through dagManagement? it seems like it from the PR that no reference to it 
should remain outside of dagManagement, however dagManagement provides a lot 
more functionality as well I'm not sure we want to give access to everywhere. 
what do you think?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -180,9 +181,7 @@ public Set<String> getFailedDagIds() throws IOException {
   @Override
   // todo - updating different maps here and in addDagNodeState can result in 
inconsistency between the maps
   public synchronized void deleteDagNodeState(DagManager.DagId dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
-    this.jobToDag.remove(dagNode);

Review Comment:
   these should be replaced with calls to the dagActionStore right to 
delete/add dag related stuff? otherwise why do we have these removals



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to