This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit b1bc9f878f54e8f40c7599de1c307ae8c1e3bfeb Author: xy720 <[email protected]> AuthorDate: Fri Nov 12 10:53:50 2021 +0800 [Feature] Clean up old sync jobs regularly (#7061) #7060 #6287 Each job that has been stopped for more than 3 days(set with Config.label_keep_max_second) will be permanently cleaned up. --- .../org/apache/doris/load/sync/SyncChecker.java | 8 ++++- .../java/org/apache/doris/load/sync/SyncJob.java | 17 ++++++++++ .../org/apache/doris/load/sync/SyncJobManager.java | 38 ++++++++++++++++++++++ .../apache/doris/load/sync/SyncJobManagerTest.java | 36 ++++++++++++++++++++ 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java index 422fe48..8b64f34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java @@ -32,7 +32,7 @@ import java.util.List; public class SyncChecker extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(SyncChecker.class); - private SyncJobManager syncJobManager; + private final SyncJobManager syncJobManager; public SyncChecker(SyncJobManager syncJobManager) { super("sync checker", Config.sync_checker_interval_second * 1000L); @@ -44,6 +44,7 @@ public class SyncChecker extends MasterDaemon { LOG.debug("start check sync jobs."); try { process(); + cleanOldSyncJobs(); } catch (Throwable e) { LOG.warn("Failed to process one round of SyncChecker", e); } @@ -74,4 +75,9 @@ public class SyncChecker extends MasterDaemon { } } } + + private void cleanOldSyncJobs() { + // clean up expired sync jobs + this.syncJobManager.cleanOldSyncJobs(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java index c0b0321..3565c04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateDataSyncJobStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; @@ -266,6 +267,18 @@ public abstract class SyncJob implements Writable { return "\\N"; } + public boolean isExpired(long currentTimeMs) { + if (!isCompleted()) { + return false; + } + Preconditions.checkState(finishTimeMs != -1L); + long expireTime = Config.label_keep_max_second * 1000L; + if ((currentTimeMs - finishTimeMs) > expireTime) { + return true; + } + return false; + } + // only use for persist when job state changed public static class SyncJobUpdateStateInfo implements Writable { @SerializedName(value = "id") @@ -450,4 +463,8 @@ public abstract class SyncJob implements Writable { public List<ChannelDescription> getChannelDescriptions() { return this.channelDescriptions; } + + public long getFinishTimeMs() { + return finishTimeMs; + } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java index 1a0d8b8..5ac5049 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java @@ -41,6 +41,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -279,6 +280,43 @@ public class SyncJobManager implements Writable { } } + // Remove old sync jobs. Called periodically. + // Stopped jobs will be removed after Config.label_keep_max_second. + public void cleanOldSyncJobs() { + LOG.debug("begin to clean old sync jobs "); + long currentTimeMs = System.currentTimeMillis(); + writeLock(); + try { + Iterator<Map.Entry<Long, SyncJob>> iterator = idToSyncJob.entrySet().iterator(); + while (iterator.hasNext()) { + SyncJob syncJob = iterator.next().getValue(); + if (syncJob.isExpired(currentTimeMs)) { + if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) { + continue; + } + Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId()); + List<SyncJob> list = map.get(syncJob.getJobName()); + list.remove(syncJob); + if (list.isEmpty()) { + map.remove(syncJob.getJobName()); + } + if (map.isEmpty()) { + dbIdToJobNameToSyncJobs.remove(syncJob.getDbId()); + } + iterator.remove(); + LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId()) + .add("finishTimeMs", syncJob.getFinishTimeMs()) + .add("currentTimeMs", currentTimeMs) + .add("jobState", syncJob.getJobState()) + .add("msg", "old sync job has been cleaned") + ); + } + } + } finally { + writeUnlock(); + } + } + public SyncJob getSyncJobById(long jobId) { return idToSyncJob.get(jobId); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java index 8fa080f..457fe43 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java @@ -405,5 +405,41 @@ public class SyncJobManagerTest { Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType()); } + @Test + public void testCleanOldSyncJobs() { + SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId); + // change sync job state to cancelled + try { + canalSyncJob.updateState(JobState.CANCELLED, false); + } catch (UserException e) { + Assert.fail(); + } + Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState()); + + SyncJobManager manager = new SyncJobManager(); + + // add a sync job to manager + Map<Long, SyncJob> idToSyncJob = Maps.newHashMap(); + idToSyncJob.put(jobId, canalSyncJob); + Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap(); + Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap(); + jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob)); + dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs); + + Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob); + Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs); + + new Expectations(canalSyncJob) { + { + canalSyncJob.isExpired(anyLong); + result = true; + } + }; + manager.cleanOldSyncJobs(); + + Assert.assertEquals(0, idToSyncJob.size()); + Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size()); + } + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
