This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch release-2.0.2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/release-2.0.2.1 by this push:
new ab3de52b99f (pick-2.0.2.1)[Fix](MySqlLoad)Fixed the MySqlLoad will
create a new thread every time checkpoint is made #26031 (#26136)
ab3de52b99f is described below
commit ab3de52b99f350a21075c7dabbd6e70c9581917d
Author: Calvin Kirs <[email protected]>
AuthorDate: Tue Oct 31 12:00:46 2023 +0800
(pick-2.0.2.1)[Fix](MySqlLoad)Fixed the MySqlLoad will create a new thread
every time checkpoint is made #26031 (#26136)
---
.../main/java/org/apache/doris/catalog/Env.java | 2 +
.../apache/doris/common/CustomThreadFactory.java | 46 ++++++++++++++++++++++
.../org/apache/doris/load/loadv2/LoadManager.java | 5 +++
.../apache/doris/load/loadv2/MysqlLoadManager.java | 17 +++++---
.../org/apache/doris/load/loadv2/TokenManager.java | 18 +++++----
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 13 ++++--
.../org/apache/doris/mtmv/MTMVTaskManager.java | 6 ++-
7 files changed, 89 insertions(+), 18 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index ea4410f60c6..9e17b2ee7ca 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1507,6 +1507,8 @@ public class Env {
// start threads that should running on all FE
private void startNonMasterDaemonThreads() {
+ // start load manager thread
+ loadManager.start();
tabletStatMgr.start();
// load and export job label cleaner thread
labelCleaner.start();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
new file mode 100644
index 00000000000..153131ec251
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CustomThreadFactory implements ThreadFactory {
+ private final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ public CustomThreadFactory(String name) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, namePrefix +
threadNumber.getAndIncrement(), 0);
+ if (t.isDaemon()) {
+ t.setDaemon(false);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 73d4d1a57ad..3f05e2c5a98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -104,6 +104,11 @@ public class LoadManager implements Writable {
this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
}
+ public void start() {
+ tokenManager.start();
+ mysqlLoadManager.start();
+ }
+
/**
* This method will be invoked by the broker load(v2) now.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index ae1871e0d9c..42583506c56 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
@@ -72,7 +73,7 @@ import java.util.concurrent.TimeUnit;
public class MysqlLoadManager {
private static final Logger LOG =
LogManager.getLogger(MysqlLoadManager.class);
- private final ThreadPoolExecutor mysqlLoadPool;
+ private ThreadPoolExecutor mysqlLoadPool;
private final TokenManager tokenManager;
private static class MySqlLoadContext {
@@ -137,14 +138,20 @@ public class MysqlLoadManager {
}
private final Map<String, MySqlLoadContext> loadContextMap = new
ConcurrentHashMap<>();
- private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
- private ScheduledExecutorService periodScheduler =
Executors.newScheduledThreadPool(1);
+ private EvictingQueue<MySqlLoadFailRecord> failedRecords;
+ private ScheduledExecutorService periodScheduler;
public MysqlLoadManager(TokenManager tokenManager) {
+ this.tokenManager = tokenManager;
+ }
+
+ public void start() {
+ this.periodScheduler = Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("mysql-load-fail-record-cleaner"));
int poolSize = Config.mysql_load_thread_pool;
// MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default.
- this.mysqlLoadPool =
ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql
Load", true);
- this.tokenManager = tokenManager;
+ this.mysqlLoadPool =
ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5,
+ "Mysql Load", true);
this.failedRecords =
EvictingQueue.create(Config.mysql_load_in_memory_record);
this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1,
24, TimeUnit.HOURS);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
index f4cf4518212..80f6c3f9b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.FrontendService;
@@ -41,18 +42,21 @@ import java.util.concurrent.TimeUnit;
public class TokenManager {
private static final Logger LOG = LogManager.getLogger(TokenManager.class);
- private final int thriftTimeoutMs = 300 * 1000;
- private final EvictingQueue<String> tokenQueue;
- private final ScheduledExecutorService tokenGenerator;
+ private int thriftTimeoutMs = 300 * 1000;
+ private EvictingQueue<String> tokenQueue;
+ private ScheduledExecutorService tokenGenerator;
public TokenManager() {
+ }
+
+ public void start() {
this.tokenQueue = EvictingQueue.create(Config.token_queue_size);
// init one token to avoid async issue.
this.tokenQueue.offer(generateNewToken());
- this.tokenGenerator = Executors.newScheduledThreadPool(1);
- this.tokenGenerator.scheduleAtFixedRate(() -> {
- tokenQueue.offer(generateNewToken());
- }, 0, Config.token_generate_period_hour, TimeUnit.HOURS);
+ this.tokenGenerator = Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("token-generator"));
+ this.tokenGenerator.scheduleAtFixedRate(() ->
tokenQueue.offer(generateNewToken()), 0,
+ Config.token_generate_period_hour, TimeUnit.HOURS);
}
private String generateNewToken() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 7295f40b602..b58f26b863a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
@@ -66,9 +67,11 @@ public class MTMVJobManager {
private final MTMVTaskManager taskManager;
- private ScheduledExecutorService periodScheduler =
Executors.newScheduledThreadPool(1);
+ private ScheduledExecutorService periodScheduler =
Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("mtmv-job-period-scheduler"));
- private ScheduledExecutorService cleanerScheduler =
Executors.newScheduledThreadPool(1);
+ private ScheduledExecutorService cleanerScheduler =
Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
private final ReentrantReadWriteLock rwLock;
@@ -86,13 +89,15 @@ public class MTMVJobManager {
// check the scheduler before using it
// since it may be shutdown when master change to follower without
process shutdown.
if (periodScheduler.isShutdown()) {
- periodScheduler = Executors.newScheduledThreadPool(1);
+ periodScheduler = Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("mtmv-job-period-scheduler"));
}
registerJobs();
if (cleanerScheduler.isShutdown()) {
- cleanerScheduler = Executors.newScheduledThreadPool(1);
+ cleanerScheduler = Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
}
cleanerScheduler.scheduleAtFixedRate(() -> {
if (!Env.getCurrentEnv().isMaster()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index d6e370480bb..138ede9e075 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.mtmv;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
@@ -65,13 +66,14 @@ public class MTMVTaskManager {
// keep track of all the completed tasks
private final Deque<MTMVTask> historyTasks =
Queues.newLinkedBlockingDeque();
- private ScheduledExecutorService taskScheduler =
Executors.newScheduledThreadPool(1);
+ private ScheduledExecutorService taskScheduler =
Executors.newScheduledThreadPool(1,
+ new CustomThreadFactory("mtmv-task-scheduler"));
private final AtomicInteger failedTaskCount = new AtomicInteger(0);
public void startTaskScheduler() {
if (taskScheduler.isShutdown()) {
- taskScheduler = Executors.newScheduledThreadPool(1);
+ taskScheduler = Executors.newScheduledThreadPool(1, new
CustomThreadFactory("mtmv-task-scheduler"));
}
taskScheduler.scheduleAtFixedRate(() -> {
checkRunningTask();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]