This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f57acabda7 [Chore] Fix JdbcRegistryTestCase might failed due to purge
dead clients interval is too small (#16894)
f57acabda7 is described below
commit f57acabda7671c635a580f9ddf97e4a00564006a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Dec 13 11:43:20 2024 +0800
[Chore] Fix JdbcRegistryTestCase might failed due to purge dead clients
interval is too small (#16894)
---
.../common/thread/ThreadUtils.java | 11 ++++++
.../extract/base/server/NettyRemotingServer.java | 41 +++++++++++++---------
.../server/master/rpc/MasterRpcServerTest.java | 6 +---
.../dolphinscheduler/registry/api/Registry.java | 4 +++
.../plugin/registry/RegistryTestCase.java | 5 +--
.../plugin/registry/jdbc/JdbcRegistry.java | 15 ++++----
.../registry/jdbc/client/JdbcRegistryClient.java | 6 +---
.../registry/jdbc/server/JdbcRegistryServer.java | 14 ++++----
8 files changed, 60 insertions(+), 42 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 1c3cfc86b7..98cbe52052 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -80,4 +80,15 @@ public class ThreadUtils {
log.error("Current thread sleep error", interruptedException);
}
}
+
+ public static void rethrowInterruptedException(InterruptedException
interruptedException) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Current thread: " +
Thread.currentThread().getName() + " is interrupted",
+ interruptedException);
+ }
+
+ public static void consumeInterruptedException(InterruptedException
interruptedException) {
+ log.info("Current thread: {} is interrupted",
Thread.currentThread().getName(), interruptedException);
+ Thread.currentThread().interrupt();
+ }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
index 246e11735c..b3f0189400 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -48,7 +49,7 @@ import io.netty.handler.timeout.IdleStateHandler;
@Slf4j
class NettyRemotingServer {
- private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+ private Channel serverBootstrapChannel;
@Getter
private final String serverName;
@@ -87,7 +88,7 @@ class NettyRemotingServer {
void start() {
if (isStarted.compareAndSet(false, true)) {
- this.serverBootstrap
+ ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
@@ -104,23 +105,24 @@ class NettyRemotingServer {
}
});
- ChannelFuture future;
try {
- future =
serverBootstrap.bind(serverConfig.getListenPort()).sync();
+ final ChannelFuture channelFuture =
serverBootstrap.bind(serverConfig.getListenPort()).sync();
+ if (channelFuture.isSuccess()) {
+ log.info("{} bind success at port: {}",
serverConfig.getServerName(), serverConfig.getListenPort());
+ this.serverBootstrapChannel = channelFuture.channel();
+ } else {
+ throw new RemoteException(
+ String.format("%s bind %s fail",
serverConfig.getServerName(),
+ serverConfig.getListenPort()),
+ channelFuture.cause());
+ }
+ } catch (InterruptedException it) {
+ ThreadUtils.rethrowInterruptedException(it);
} catch (Exception e) {
throw new RemoteException(
String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()),
e);
}
-
- if (future.isSuccess()) {
- log.info("{} bind success at port: {}",
serverConfig.getServerName(), serverConfig.getListenPort());
- return;
- }
-
- throw new RemoteException(
- String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()),
- future.cause());
}
}
@@ -144,18 +146,25 @@ class NettyRemotingServer {
void close() {
if (isStarted.compareAndSet(true, false)) {
+ log.info("{} closing", serverConfig.getServerName());
try {
+ if (serverBootstrapChannel != null) {
+ serverBootstrapChannel.close().sync();
+ log.info("{} stop bind at port: {}",
serverConfig.getServerName(), serverConfig.getListenPort());
+ }
if (bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
- methodInvokerExecutor.shutdown();
+ methodInvokerExecutor.shutdownNow();
+ } catch (InterruptedException it) {
+ ThreadUtils.consumeInterruptedException(it);
} catch (Exception ex) {
- log.error("netty server close exception", ex);
+ log.error("{} close failed", serverConfig.getServerName(), ex);
}
- log.info("netty server closed");
+ log.info("{} closed", serverConfig.getServerName());
}
}
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
index 1e5a77edb3..649801d270 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
@@ -27,12 +27,8 @@ class MasterRpcServerTest {
private final MasterRpcServer masterRpcServer = new MasterRpcServer(new
MasterConfig());
@Test
- void testStart() {
+ void testStartAndClose() {
Assertions.assertDoesNotThrow(masterRpcServer::start);
- }
-
- @Test
- void testClose() {
Assertions.assertDoesNotThrow(masterRpcServer::close);
}
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
index 994347366f..7c0a4c3ae1 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.registry.api;
import java.io.Closeable;
+import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
@@ -109,4 +110,7 @@ public interface Registry extends Closeable {
* Release the lock of the prefix {@param key}
*/
boolean releaseLock(String key);
+
+ @Override
+ void close() throws IOException;
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
index 3d0c169e59..a9a23dfa0f 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
@@ -28,6 +28,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.time.Duration;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -172,8 +173,8 @@ public abstract class RegistryTestCase<R extends Registry> {
registry.put(master1, value, true);
registry.put(master2, value, true);
assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
-
assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
- "127.0.0.2:8080");
+
assertThat(registry.children("/nodes/children/childGroup1")).containsExactlyElementsIn(
+ Arrays.asList("127.0.0.1:8080", "127.0.0.2:8080"));
}
@Test
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
index 11e3f62172..bd468f58b7 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
@@ -58,8 +58,8 @@ public final class JdbcRegistry implements Registry {
JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
this.jdbcRegistryServer = jdbcRegistryServer;
- this.jdbcRegistryClient = new
JdbcRegistryClient(jdbcRegistryProperties, jdbcRegistryServer);
- log.info("Initialize Jdbc Registry...");
+ this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryServer);
+ log.info("Initialized Jdbc Registry...");
}
@Override
@@ -259,13 +259,14 @@ public final class JdbcRegistry implements Registry {
@Override
public void close() {
- log.info("Closing Jdbc Registry...");
+ log.info("Closing JdbcRegistry...");
// remove the current Ephemeral node, if can connect to jdbc
- try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
-
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
+
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
+ try (final JdbcRegistryClient closed1 = jdbcRegistryClient) {
+ // ignore
} catch (Exception e) {
- log.error("Close Jdbc Registry error", e);
+ log.error("Close JdbcRegistry error", e);
}
- log.info("Closed Jdbc Registry...");
+ log.info("Closed JdbcRegistry...");
}
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
index b00424c8e4..e441547444 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java
@@ -20,7 +20,6 @@ package
org.apache.dolphinscheduler.plugin.registry.jdbc.client;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType;
import
org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO;
import
org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener;
@@ -41,14 +40,11 @@ public class JdbcRegistryClient implements
IJdbcRegistryClient {
private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_"
+ OSUtils.getProcessID();
- private final JdbcRegistryProperties jdbcRegistryProperties;
-
private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify;
private final IJdbcRegistryServer jdbcRegistryServer;
- public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties,
IJdbcRegistryServer jdbcRegistryServer) {
- this.jdbcRegistryProperties = jdbcRegistryProperties;
+ public JdbcRegistryClient(IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClientIdentify =
new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(),
DEFAULT_CLIENT_NAME);
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
index e04360bc6f..f060abf5ba 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java
@@ -107,8 +107,8 @@ public class JdbcRegistryServer implements
IJdbcRegistryServer {
purgeInvalidJdbcRegistryMetadata();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::purgeInvalidJdbcRegistryMetadata,
-
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
-
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
+ jdbcRegistryProperties.getSessionTimeout().toMillis(),
+ jdbcRegistryProperties.getSessionTimeout().toMillis(),
TimeUnit.MILLISECONDS);
jdbcRegistryDataManager.start();
jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
@@ -149,13 +149,13 @@ public class JdbcRegistryServer implements
IJdbcRegistryServer {
@Override
public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) {
checkNotNull(jdbcRegistryClient);
- jdbcRegistryClients.remove(jdbcRegistryClient);
-
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());
+ final JdbcRegistryClientIdentify clientIdentify =
jdbcRegistryClient.getJdbcRegistryClientIdentify();
+ checkNotNull(clientIdentify);
- JdbcRegistryClientIdentify jdbcRegistryClientIdentify =
jdbcRegistryClient.getJdbcRegistryClientIdentify();
- checkNotNull(jdbcRegistryClientIdentify);
+ jdbcRegistryClients.removeIf(client ->
clientIdentify.equals(client.getJdbcRegistryClientIdentify()));
+
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());
-
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId()));
+
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(clientIdentify.getClientId()));
}
@Override