This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 736f027c9c [ISSUE #9870] Ensure metadata provider cache executors are
shutdown correctly (#9871)
736f027c9c is described below
commit 736f027c9c763a804cca5114c484dd9653516464
Author: majialong <[email protected]>
AuthorDate: Mon Nov 24 10:34:43 2025 +0800
[ISSUE #9870] Ensure metadata provider cache executors are shutdown
correctly (#9871)
* [ISSUE #9870] Ensure metadata provider cache executors are shutdown
correctly
* Improve test comments
---
.../LocalAuthenticationMetadataProvider.java | 7 ++-
.../LocalAuthorizationMetadataProvider.java | 7 ++-
.../LocalAuthenticationMetadataProviderTest.java | 50 ++++++++++++++++++++++
.../LocalAuthorizationMetadataProviderTest.java | 50 ++++++++++++++++++++++
4 files changed, 112 insertions(+), 2 deletions(-)
diff --git
a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
index 04e745eaaf..93d0327271 100644
---
a/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
+++
b/auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java
@@ -46,6 +46,8 @@ public class LocalAuthenticationMetadataProvider implements
AuthenticationMetada
private LoadingCache<String, User> userCache;
+ protected ThreadPoolExecutor cacheRefreshExecutor;
+
@Override
public void initialize(AuthConfig authConfig, Supplier<?> metadataService)
{
this.storage =
ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator +
"users", false);
@@ -53,7 +55,7 @@ public class LocalAuthenticationMetadataProvider implements
AuthenticationMetada
throw new RuntimeException("Failed to load rocksdb for auth_user,
please check whether it is occupied");
}
- ThreadPoolExecutor cacheRefreshExecutor =
ThreadPoolMonitor.createAndMonitor(
+ this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
1,
1,
1000 * 60,
@@ -144,6 +146,9 @@ public class LocalAuthenticationMetadataProvider implements
AuthenticationMetada
if (this.storage != null) {
this.storage.shutdown();
}
+ if (this.cacheRefreshExecutor != null) {
+ this.cacheRefreshExecutor.shutdown();
+ }
}
private static class UserCacheLoader implements CacheLoader<String, User> {
diff --git
a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
index 6db999bee7..f6b8ecaf3d 100644
---
a/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
+++
b/auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java
@@ -51,13 +51,15 @@ public class LocalAuthorizationMetadataProvider implements
AuthorizationMetadata
private LoadingCache<String, Acl> aclCache;
+ protected ThreadPoolExecutor cacheRefreshExecutor;
+
@Override
public void initialize(AuthConfig authConfig, Supplier<?> metadataService)
{
this.storage =
ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator +
"acls", false);
if (!this.storage.start()) {
throw new RuntimeException("Failed to load rocksdb for auth_acl,
please check whether it is occupied.");
}
- ThreadPoolExecutor cacheRefreshExecutor =
ThreadPoolMonitor.createAndMonitor(
+ this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
1,
1,
1000 * 60,
@@ -172,6 +174,9 @@ public class LocalAuthorizationMetadataProvider implements
AuthorizationMetadata
if (this.storage != null) {
this.storage.shutdown();
}
+ if (this.cacheRefreshExecutor != null) {
+ this.cacheRefreshExecutor.shutdown();
+ }
}
private static class AclCacheLoader implements CacheLoader<String, Acl> {
diff --git
a/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java
b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java
new file mode 100644
index 0000000000..15ec8c3260
--- /dev/null
+++
b/auth/src/test/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProviderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.auth.authentication.provider;
+
+import org.apache.rocketmq.auth.config.AuthConfig;
+import org.apache.rocketmq.auth.helper.AuthTestHelper;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class LocalAuthenticationMetadataProviderTest {
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testShutdownReleasesCacheExecutor() throws Exception {
+ AuthConfig authConfig = AuthTestHelper.createDefaultConfig();
+
authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath());
+
+ LocalAuthenticationMetadataProvider provider = new
LocalAuthenticationMetadataProvider();
+ // Initialize provider to create the internal cache refresh executor
+ provider.initialize(authConfig, () -> null);
+
+ // After initialization, the executor should exist and not be shutdown
+ Assert.assertNotNull(provider.cacheRefreshExecutor);
+ Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown());
+
+ // Shutdown provider should also shutdown its executor to release
resources
+ provider.shutdown();
+
+ // Verify that the cache refresh executor has been shutdown
+ Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown());
+ }
+}
diff --git
a/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java
b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java
new file mode 100644
index 0000000000..32771a4d80
--- /dev/null
+++
b/auth/src/test/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProviderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.auth.authorization.provider;
+
+import org.apache.rocketmq.auth.config.AuthConfig;
+import org.apache.rocketmq.auth.helper.AuthTestHelper;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class LocalAuthorizationMetadataProviderTest {
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testShutdownReleasesCacheExecutor() throws Exception {
+ AuthConfig authConfig = AuthTestHelper.createDefaultConfig();
+
authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath());
+
+ LocalAuthorizationMetadataProvider provider = new
LocalAuthorizationMetadataProvider();
+ // Initialize provider to create the internal cache refresh executor
+ provider.initialize(authConfig, () -> null);
+
+ // After initialization, the executor should exist and not be shutdown
+ Assert.assertNotNull(provider.cacheRefreshExecutor);
+ Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown());
+
+ // Shutdown provider should also shutdown its executor to release
resources
+ provider.shutdown();
+
+ // Verify that the cache refresh executor has been shutdown
+ Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown());
+ }
+}