umustafi commented on code in PR #3954:
URL: https://github.com/apache/gobblin/pull/3954#discussion_r1618162179
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -238,8 +238,8 @@ public void testStartupSequenceHandlesFailures() throws
Exception {
String jobName = "testJobName";
String flowExecutionId = "12345677";
- MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
- mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
+ DagManagementStateStore dagManagementStateStore =
mock(DagManagementStateStore.class);
+ //mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
Review Comment:
do we still need to add the jobDagAction through dagManagementStateStore?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -201,4 +196,77 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag)
throws IOException {
* has any running job, false otherwise.
*/
public boolean hasRunningJobs(DagManager.DagId dagId);
+
+ /**
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param jobName job name for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ boolean existsJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
+
+ /**
+ * Check if an action exists in dagAction store by flow group, flow name,
and flow execution id, it assumes jobName is
+ * empty ("").
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ boolean existsFlowDagAction(String flowGroup, String flowName, String
flowExecutionId,
+ DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
+
+ /** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore}
for durability */
+ default void addDagAction(DagActionStore.DagAction dagAction) throws
IOException {
+ addJobDagAction(
+ dagAction.getFlowGroup(),
+ dagAction.getFlowName(),
+ dagAction.getFlowExecutionId(),
+ dagAction.getJobName(),
+ dagAction.getDagActionType());
+ }
+
+ /**
+ * Persist the dag action in {@link DagActionStore} for durability
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @param jobName job name for the dag action
+ * @param dagActionType the value of the dag action
+ * @throws IOException
+ */
+ void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName,
+ DagActionStore.DagActionType dagActionType) throws IOException;
Review Comment:
why no implementation for this one and exists() but present for others??
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -40,16 +40,18 @@
import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+
@Slf4j
public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends
GobblinServiceFlowExecutionResourceHandler{
- private DagActionStore dagActionStore;
+ private DagManagementStateStore dagManagementStateStore;
@Inject
public
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler
handler,
@Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
- Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER)
boolean forceLeader, DagActionStore dagActionStore) {
+ Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER)
boolean forceLeader, DagManagementStateStore dagManagementStateStore) {
super(handler, eventBus, manager, forceLeader);
Review Comment:
do we want to always encapsulate the dagAction additional/removal/checking
through dagManagement? it seems like it from the PR that no reference to it
should remain outside of dagManagement, however dagManagement provides a lot
more functionality as well I'm not sure we want to give access to everywhere.
what do you think?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -180,9 +181,7 @@ public Set<String> getFailedDagIds() throws IOException {
@Override
// todo - updating different maps here and in addDagNodeState can result in
inconsistency between the maps
public synchronized void deleteDagNodeState(DagManager.DagId dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
- this.jobToDag.remove(dagNode);
Review Comment:
these should be replaced with calls to the dagActionStore right to
delete/add dag related stuff? otherwise why do we have these removals
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]