[
https://issues.apache.org/jira/browse/GOBBLIN-1778?focusedWorklogId=843586&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-843586
]
ASF GitHub Bot logged work on GOBBLIN-1778:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 03/Feb/23 21:47
Start Date: 03/Feb/23 21:47
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3635:
URL: https://github.com/apache/gobblin/pull/3635#discussion_r1096309836
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -272,6 +275,9 @@ protected void startUp() {
* @param setStatus if true, set all jobs in the dag to pending
*/
synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean
setStatus) throws IOException {
+ if (!this.isActive) {
+ return;
+ }
Review Comment:
Would this ever cause dags to be missed? If during the process where one new
dag is being added and then the leader is changed
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -405,10 +411,15 @@ public synchronized void setActive(boolean active) {
}
FailedDagRetentionThread failedDagRetentionThread = new
FailedDagRetentionThread(failedDagStateStore, failedDagIds,
failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0,
retentionPollingInterval, TimeUnit.MINUTES);
- List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
- log.info("Loading " + dags.size() + " dags from dag state store");
- for (Dag<JobExecutionPlan> dag : dags) {
- addDag(dag, false, false);
+ loadingDagsFromDagStateStore();
+ this.houseKeepingThreadPool =
Executors.newSingleThreadScheduledExecutor();
+ for (int delay = houseKeepingThreadInitialDelay; delay < 180; delay *=
2) {
Review Comment:
make 180 a static variable named: `MAX_HOUSEKEEPING_THREAD_DELAY` ?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -441,6 +453,14 @@ public synchronized void setActive(boolean active) {
}
}
+ private void loadingDagsFromDagStateStore() throws IOException {
Review Comment:
Tiny grammar nit: rename to loadDagFromDagStateStore? Since when it reads
it's more declarative
Issue Time Tracking
-------------------
Worklog Id: (was: 843586)
Time Spent: 0.5h (was: 20m)
> Add house keeping thread in DagManager to periodically sync in memory state
> with mysql table
> ---------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1778
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1778
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Now dag managers have the assumption that it is the only process that can
> update mysql table and the in-memory state is always in sync with mysql. But
> we do notice that during the leader transforms period, it's possible that two
> dag manager can run concurrently and update the mysql db at the same time.
> To address that, we need either add a lock to make sure only one dag manager
> is working at one time, or we need to have a housekeeping thread to
> periodically sync the in-memory state with the mysql table. After discussion,
> we choose to go with later approach
--
This message was sent by Atlassian Jira
(v8.20.10#820010)