This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f57acabda7 [Chore] Fix JdbcRegistryTestCase might failed due to purge 
dead clients interval is too small (#16894)
f57acabda7 is described below

commit f57acabda7671c635a580f9ddf97e4a00564006a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Dec 13 11:43:20 2024 +0800

    [Chore] Fix JdbcRegistryTestCase might failed due to purge dead clients 
interval is too small (#16894)
---
 .../common/thread/ThreadUtils.java                 | 11 ++++++
 .../extract/base/server/NettyRemotingServer.java   | 41 +++++++++++++---------
 .../server/master/rpc/MasterRpcServerTest.java     |  6 +---
 .../dolphinscheduler/registry/api/Registry.java    |  4 +++
 .../plugin/registry/RegistryTestCase.java          |  5 +--
 .../plugin/registry/jdbc/JdbcRegistry.java         | 15 ++++----
 .../registry/jdbc/client/JdbcRegistryClient.java   |  6 +---
 .../registry/jdbc/server/JdbcRegistryServer.java   | 14 ++++----
 8 files changed, 60 insertions(+), 42 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 1c3cfc86b7..98cbe52052 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -80,4 +80,15 @@ public class ThreadUtils {
             log.error("Current thread sleep error", interruptedException);
         }
     }
+
+    public static void rethrowInterruptedException(InterruptedException 
interruptedException) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Current thread: " + 
Thread.currentThread().getName() + " is interrupted",
+                interruptedException);
+    }
+
+    public static void consumeInterruptedException(InterruptedException 
interruptedException) {
+        log.info("Current thread: {} is interrupted", 
Thread.currentThread().getName(), interruptedException);
+        Thread.currentThread().interrupt();
+    }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
index 246e11735c..b3f0189400 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -48,7 +49,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 @Slf4j
 class NettyRemotingServer {
 
-    private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+    private Channel serverBootstrapChannel;
 
     @Getter
     private final String serverName;
@@ -87,7 +88,7 @@ class NettyRemotingServer {
 
     void start() {
         if (isStarted.compareAndSet(false, true)) {
-            this.serverBootstrap
+            ServerBootstrap serverBootstrap = new ServerBootstrap()
                     .group(this.bossGroup, this.workGroup)
                     .channel(NettyUtils.getServerSocketChannelClass())
                     .option(ChannelOption.SO_REUSEADDR, true)
@@ -104,23 +105,24 @@ class NettyRemotingServer {
                         }
                     });
 
-            ChannelFuture future;
             try {
-                future = 
serverBootstrap.bind(serverConfig.getListenPort()).sync();
+                final ChannelFuture channelFuture = 
serverBootstrap.bind(serverConfig.getListenPort()).sync();
+                if (channelFuture.isSuccess()) {
+                    log.info("{} bind success at port: {}", 
serverConfig.getServerName(), serverConfig.getListenPort());
+                    this.serverBootstrapChannel = channelFuture.channel();
+                } else {
+                    throw new RemoteException(
+                            String.format("%s bind %s fail", 
serverConfig.getServerName(),
+                                    serverConfig.getListenPort()),
+                            channelFuture.cause());
+                }
+            } catch (InterruptedException it) {
+                ThreadUtils.rethrowInterruptedException(it);
             } catch (Exception e) {
                 throw new RemoteException(
                         String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()),
                         e);
             }
-
-            if (future.isSuccess()) {
-                log.info("{} bind success at port: {}", 
serverConfig.getServerName(), serverConfig.getListenPort());
-                return;
-            }
-
-            throw new RemoteException(
-                    String.format("%s bind %s fail", 
serverConfig.getServerName(), serverConfig.getListenPort()),
-                    future.cause());
         }
     }
 
@@ -144,18 +146,25 @@ class NettyRemotingServer {
 
     void close() {
         if (isStarted.compareAndSet(true, false)) {
+            log.info("{} closing", serverConfig.getServerName());
             try {
+                if (serverBootstrapChannel != null) {
+                    serverBootstrapChannel.close().sync();
+                    log.info("{} stop bind at port: {}", 
serverConfig.getServerName(), serverConfig.getListenPort());
+                }
                 if (bossGroup != null) {
                     this.bossGroup.shutdownGracefully();
                 }
                 if (workGroup != null) {
                     this.workGroup.shutdownGracefully();
                 }
-                methodInvokerExecutor.shutdown();
+                methodInvokerExecutor.shutdownNow();
+            } catch (InterruptedException it) {
+                ThreadUtils.consumeInterruptedException(it);
             } catch (Exception ex) {
-                log.error("netty server close exception", ex);
+                log.error("{} close failed", serverConfig.getServerName(), ex);
             }
-            log.info("netty server closed");
+            log.info("{} closed", serverConfig.getServerName());
         }
     }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
index 1e5a77edb3..649801d270 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
@@ -27,12 +27,8 @@ class MasterRpcServerTest {
     private final MasterRpcServer masterRpcServer = new MasterRpcServer(new 
MasterConfig());
 
     @Test
-    void testStart() {
+    void testStartAndClose() {
         Assertions.assertDoesNotThrow(masterRpcServer::start);
-    }
-
-    @Test
-    void testClose() {
         Assertions.assertDoesNotThrow(masterRpcServer::close);
     }
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
index 994347366f..7c0a4c3ae1 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.registry.api;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
 
@@ -109,4 +110,7 @@ public interface Registry extends Closeable {
      * Release the lock of the prefix {@param key}
      */
     boolean releaseLock(String key);
+
+    @Override
+    void close() throws IOException;
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
index 3d0c169e59..a9a23dfa0f 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
@@ -28,6 +28,7 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -172,8 +173,8 @@ public abstract class RegistryTestCase<R extends Registry> {
         registry.put(master1, value, true);
         registry.put(master2, value, true);
         
assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
-        
assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
-                "127.0.0.2:8080");
+        
assertThat(registry.children("/nodes/children/childGroup1")).containsExactlyElementsIn(
+                Arrays.asList("127.0.0.1:8080", "127.0.0.2:8080"));
     }
 
     @Test
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
index 11e3f62172..bd468f58b7 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
@@ -58,8 +58,8 @@ public final class JdbcRegistry implements Registry {
     JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, 
IJdbcRegistryServer jdbcRegistryServer) {
         this.jdbcRegistryProperties = jdbcRegistryProperties;
         this.jdbcRegistryServer = jdbcRegistryServer;
-        this.jdbcRegistryClient = new 
JdbcRegistryClient(jdbcRegistryProperties, jdbcRegistryServer);
-        log.info("Initialize Jdbc Registry...");
+        this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryServer);
+        log.info("Initialized Jdbc Registry...");
     }
 
     @Override
@@ -259,13 +259,14 @@ public final class JdbcRegistry implements Registry {
 
     @Override
     public void close() {
-        log.info("Closing Jdbc Registry...");
+        log.info("Closing JdbcRegistry...");
         // remove the current Ephemeral node, if can connect to jdbc
-        try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
-            
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
+        
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
+        try (final JdbcRegistryClient closed1 = jdbcRegistryClient) {
+            // ignore
         } catch (Exception e) {
-            log.error("Close Jdbc Registry error", e);
+            log.error("Close JdbcRegistry error", e);
         }
-        log.info("Closed Jdbc Registry...");
+        log.info("Closed JdbcRegistry...");
     }
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
index b00424c8e4..e441547444 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
@@ -20,7 +20,6 @@ package 
org.apache.dolphinscheduler.plugin.registry.jdbc.client;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
 import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType;
 import 
org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO;
 import 
org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener;
@@ -41,14 +40,11 @@ public class JdbcRegistryClient implements 
IJdbcRegistryClient {
 
     private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_" 
+ OSUtils.getProcessID();
 
-    private final JdbcRegistryProperties jdbcRegistryProperties;
-
     private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify;
 
     private final IJdbcRegistryServer jdbcRegistryServer;
 
-    public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties, 
IJdbcRegistryServer jdbcRegistryServer) {
-        this.jdbcRegistryProperties = jdbcRegistryProperties;
+    public JdbcRegistryClient(IJdbcRegistryServer jdbcRegistryServer) {
         this.jdbcRegistryServer = jdbcRegistryServer;
         this.jdbcRegistryClientIdentify =
                 new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(), 
DEFAULT_CLIENT_NAME);
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
index e04360bc6f..f060abf5ba 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
@@ -107,8 +107,8 @@ public class JdbcRegistryServer implements 
IJdbcRegistryServer {
         purgeInvalidJdbcRegistryMetadata();
         
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
                 this::purgeInvalidJdbcRegistryMetadata,
-                
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
-                
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
+                jdbcRegistryProperties.getSessionTimeout().toMillis(),
+                jdbcRegistryProperties.getSessionTimeout().toMillis(),
                 TimeUnit.MILLISECONDS);
         jdbcRegistryDataManager.start();
         jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
@@ -149,13 +149,13 @@ public class JdbcRegistryServer implements 
IJdbcRegistryServer {
     @Override
     public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) {
         checkNotNull(jdbcRegistryClient);
-        jdbcRegistryClients.remove(jdbcRegistryClient);
-        
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());
+        final JdbcRegistryClientIdentify clientIdentify = 
jdbcRegistryClient.getJdbcRegistryClientIdentify();
+        checkNotNull(clientIdentify);
 
-        JdbcRegistryClientIdentify jdbcRegistryClientIdentify = 
jdbcRegistryClient.getJdbcRegistryClientIdentify();
-        checkNotNull(jdbcRegistryClientIdentify);
+        jdbcRegistryClients.removeIf(client -> 
clientIdentify.equals(client.getJdbcRegistryClientIdentify()));
+        
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());
 
-        
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId()));
+        
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(clientIdentify.getClientId()));
     }
 
     @Override

Reply via email to