[ 
https://issues.apache.org/jira/browse/GOBBLIN-1778?focusedWorklogId=843586&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-843586
 ]

ASF GitHub Bot logged work on GOBBLIN-1778:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Feb/23 21:47
            Start Date: 03/Feb/23 21:47
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3635:
URL: https://github.com/apache/gobblin/pull/3635#discussion_r1096309836


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -272,6 +275,9 @@ protected void startUp() {
    * @param setStatus if true, set all jobs in the dag to pending
    */
   synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean 
setStatus) throws IOException {
+    if (!this.isActive) {
+      return;
+    }

Review Comment:
   Would this ever cause dags to be missed? If during the process where one new 
dag is being added and then the leader is changed



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -405,10 +411,15 @@ public synchronized void setActive(boolean active) {
         }
         FailedDagRetentionThread failedDagRetentionThread = new 
FailedDagRetentionThread(failedDagStateStore, failedDagIds, 
failedDagRetentionTime);
         
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, 
retentionPollingInterval, TimeUnit.MINUTES);
-        List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
-        log.info("Loading " + dags.size() + " dags from dag state store");
-        for (Dag<JobExecutionPlan> dag : dags) {
-          addDag(dag, false, false);
+        loadingDagsFromDagStateStore();
+        this.houseKeepingThreadPool = 
Executors.newSingleThreadScheduledExecutor();
+        for (int delay = houseKeepingThreadInitialDelay; delay < 180; delay *= 
2) {

Review Comment:
   make 180 a static variable named: `MAX_HOUSEKEEPING_THREAD_DELAY` ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -441,6 +453,14 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  private void loadingDagsFromDagStateStore() throws IOException {

Review Comment:
   Tiny grammar nit: rename to loadDagFromDagStateStore? Since when it reads 
it's more declarative





Issue Time Tracking
-------------------

    Worklog Id:     (was: 843586)
    Time Spent: 0.5h  (was: 20m)

> Add house keeping thread in DagManager to periodically sync in memory state 
> with mysql table 
> ---------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1778
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1778
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Now dag managers have the assumption that it is the only process that can 
> update mysql table and the in-memory state is always in sync with mysql. But 
> we do notice that during the leader transforms period, it's possible that two 
> dag manager can run concurrently and update the mysql db at the same time. 
> To address that, we need either add a lock to make sure only one dag manager 
> is working at one time, or we need to have a housekeeping thread to 
> periodically sync the in-memory state with the mysql table. After discussion, 
> we choose to go with later approach



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to