This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.0.0-beta-2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.0-beta-2-prepare by this 
push:
     new 742944c60d [Fix] [Worker] Fix worker will hang if fails to start  
(#10501)
742944c60d is described below

commit 742944c60d0bc59b79931b10dd4d41bd7e40fac5
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Jun 22 10:13:55 2022 +0800

    [Fix] [Worker] Fix worker will hang if fails to start  (#10501)
    
    * [Fix] [Worker] Fix worker will hang if fails to start (#10342)
    
    * Fix worker will hang if fails to start
    
    * Add .run to ignore
    
    Signed-off-by: ruanwenjun <[email protected]>
    
    * Add import Epoll
    
    * Add cpu_quota in h2 to fix UT
    
    * Remove cpu_quota,memory_max in TaskDefiniitionMapper
---
 .gitignore                                               |  1 +
 .../dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml |  2 +-
 .../plugin/datasource/hive/HiveDataSourceClient.java     |  4 +++-
 .../server/log/LoggerRequestProcessor.java               | 16 ++++++++++------
 .../dolphinscheduler/remote/NettyRemotingServer.java     |  7 ++++---
 5 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/.gitignore b/.gitignore
index 2452b3ecc1..327269484a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
 .DS_Store
 .target
 .idea/
+.run/
 target/
 dist/
 all-dependencies.txt
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 255be38b2f..d7cf5f11bd 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -28,7 +28,7 @@
         ${alias}.task_type, ${alias}.task_params, ${alias}.flag, 
${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
         ${alias}.fail_retry_times, ${alias}.fail_retry_interval, 
${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
         ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, 
${alias}.update_time, ${alias}.task_group_id,
-        ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max
+        ${alias}.task_group_priority
     </sql>
     <select id="queryByName" 
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index 18a0490033..a45e839f80 100644
--- 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.hive;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.zaxxer.hikari.HikariDataSource;
 import 
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
 import 
org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
@@ -60,7 +61,8 @@ public class HiveDataSourceClient extends 
CommonDataSourceClient {
     @Override
     protected void preInit() {
         logger.info("PreInit in {}", getClass().getName());
-        this.kerberosRenewalService = 
Executors.newSingleThreadScheduledExecutor();
+        this.kerberosRenewalService = 
Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
     }
 
     @Override
diff --git 
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
 
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index afc914ef05..1ab4aa13fe 100644
--- 
a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ 
b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -31,6 +31,7 @@ import 
org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -54,6 +55,8 @@ import org.springframework.stereotype.Component;
 
 import io.netty.channel.Channel;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * logger request process logic
  */
@@ -65,7 +68,8 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
     private final ExecutorService executor;
 
     public LoggerRequestProcessor() {
-        this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1);
+        this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1,
+                new NamedThreadFactory("Log-Request-Process-Thread"));
     }
 
     @Override
@@ -80,7 +84,7 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
                         command.getBody(), GetLogBytesRequestCommand.class);
                 String path = getLogRequest.getPath();
                 if (!checkPathSecurity(path)) {
-                    throw new IllegalArgumentException("Illegal path");
+                    throw new IllegalArgumentException("Illegal path: " + 
path);
                 }
                 byte[] bytes = getFileContentBytes(path);
                 GetLogBytesResponseCommand getLogResponse = new 
GetLogBytesResponseCommand(bytes);
@@ -91,7 +95,7 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
                         command.getBody(), ViewLogRequestCommand.class);
                 String viewLogPath = viewLogRequest.getPath();
                 if (!checkPathSecurity(viewLogPath)) {
-                    throw new IllegalArgumentException("Illegal path");
+                    throw new IllegalArgumentException("Illegal path: " + 
viewLogPath);
                 }
                 String msg = LoggerUtils.readWholeFileContent(viewLogPath);
                 ViewLogResponseCommand viewLogResponse = new 
ViewLogResponseCommand(msg);
@@ -103,7 +107,7 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
 
                 String rollViewLogPath = rollViewLogRequest.getPath();
                 if (!checkPathSecurity(rollViewLogPath)) {
-                    throw new IllegalArgumentException("Illegal path");
+                    throw new IllegalArgumentException("Illegal path: " + 
rollViewLogPath);
                 }
 
                 List<String> lines = readPartFileContent(rollViewLogPath,
@@ -121,7 +125,7 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
 
                 String taskLogPath = removeTaskLogRequest.getPath();
                 if (!checkPathSecurity(taskLogPath)) {
-                    throw new IllegalArgumentException("Illegal path");
+                    throw new IllegalArgumentException("Illegal path: " + 
taskLogPath);
                 }
                 File taskLogFile = new File(taskLogPath);
                 boolean status = true;
@@ -137,7 +141,7 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
                 
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
                 break;
             default:
-                throw new IllegalArgumentException("unknown commandType");
+                throw new IllegalArgumentException("unknown commandType: " + 
commandType);
         }
     }
 
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index 665779dbe3..098751ab65 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -43,6 +43,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -102,9 +103,9 @@ public class NettyRemotingServer {
      */
     public NettyRemotingServer(final NettyServerConfig serverConfig) {
         this.serverConfig = serverConfig;
-        ThreadFactory bossThreadFactory = new 
ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build();
-        ThreadFactory workerThreadFactory = new 
ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build();
-        if (NettyUtils.useEpoll()) {
+        ThreadFactory bossThreadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build();
+        ThreadFactory workerThreadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build();
+        if (Epoll.isAvailable()) {
             this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
             this.workGroup = new 
EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
         } else {

Reply via email to