This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 18c39ff593 [INLONG-8961][Manager] Server-side request forgery attack 
prevention in some classes (#9012)
18c39ff593 is described below

commit 18c39ff593b1afb71063d85dc89adc4d589516cf
Author: sunrisefromdark <[email protected]>
AuthorDate: Wed Oct 11 22:03:53 2023 +0800

    [INLONG-8961][Manager] Server-side request forgery attack prevention in 
some classes (#9012)
---
 .../manager/common/util/UrlVerificationUtils.java  |  58 +++++++
 .../pojo/sink/ck/ClickHouseSinkRequest.java        |   4 +
 .../pojo/sink/postgresql/PostgreSQLSinkDTO.java    |   6 +
 .../resource/sink/ck/ClickHouseJdbcUtils.java      |  41 +++--
 .../service/resource/sink/hive/HiveJdbcUtils.java  |  44 +++--
 .../resource/sink/mysql/MySQLJdbcUtils.java        |  28 +--
 .../sink/postgresql/PostgreSQLJdbcUtils.java       |  28 ++-
 .../sink/starrocks/StarRocksJdbcUtils.java         |  22 ++-
 .../controller/cluster/ClusterController.java      |  60 +++++++
 .../controller/topic/TopicWebController.java       | 189 ++++++++++++++++++---
 .../tubemq/manager/service/MasterServiceImpl.java  | 125 +++++++++++++-
 .../tubemq/manager/service/TopicServiceImpl.java   | 142 +++++++++++++++-
 12 files changed, 666 insertions(+), 81 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/UrlVerificationUtils.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/UrlVerificationUtils.java
new file mode 100644
index 0000000000..086a73002f
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/UrlVerificationUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+
+public class UrlVerificationUtils {
+
+    /**
+     * Extracts the hostname and validates the port from a JDBC URL with the 
specified prefix.
+     *
+     * @param fullUrl The full JDBC URL to extract the hostname and port from
+     * @param prefix  The expected prefix of the JDBC URL
+     * @throws Exception If the URL format is invalid or the port is invalid
+     */
+    public static void extractHostAndValidatePortFromJdbcUrl(String fullUrl, 
String prefix) throws Exception {
+        if (!fullUrl.startsWith(prefix)) {
+            throw new Exception("Invalid JDBC URL, it should start with " + 
prefix);
+        }
+        // Extract the host and port part after the prefix
+        String hostPortPart = fullUrl.substring(prefix.length());
+        String[] hostPortParts = hostPortPart.split(InlongConstants.SLASH);
+
+        if (hostPortParts.length < 1) {
+            throw new Exception("Invalid JDBC URL format");
+        }
+        String hostPort = hostPortParts[0];
+        String[] hostPortSplit = hostPort.split(InlongConstants.COLON);
+        if (hostPortSplit.length != 2) {
+            throw new Exception("Invalid host:port format in JDBC URL");
+        }
+
+        String portStr = hostPortSplit[1];
+        try {
+            int portNumber = Integer.parseInt(portStr);
+            if (portNumber < 1 || portNumber > 65535) {
+                throw new Exception("Invalid port number in JDBC URL");
+            }
+        } catch (NumberFormatException e) {
+            throw new Exception("Invalid port number format in JDBC URL");
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
index e442383b1d..8f27349fba 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.Pattern;
 
@@ -40,13 +41,16 @@ import javax.validation.constraints.Pattern;
 public class ClickHouseSinkRequest extends SinkRequest {
 
     @ApiModelProperty("JDBC URL of the ClickHouse server")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
     private String jdbcUrl;
 
     @ApiModelProperty("Username of the ClickHouse server")
+    @Length(max = 128, message = "length must be less than or equal to 128")
     private String username;
 
     @ApiModelProperty("User password of the ClickHouse server")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String password;
 
     @ApiModelProperty("Target database name")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index 46b64a0e8c..10ec7bbc18 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -29,8 +29,10 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
 
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -46,12 +48,16 @@ import java.util.Map;
 public class PostgreSQLSinkDTO {
 
     @ApiModelProperty("JDBC URL of the PostgreSQL server")
+    @Length(max = 512, message = "length must be less than or equal to 512")
+    @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
     private String jdbcUrl;
 
     @ApiModelProperty("Username for JDBC URL")
+    @Length(max = 128, message = "length must be less than or equal to 128")
     private String username;
 
     @ApiModelProperty("User password")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String password;
 
     @ApiModelProperty("Target database name")
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
index e66f502977..4c3148a107 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.service.resource.sink.ck;
 
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import ru.yandex.clickhouse.ClickHouseDatabaseMetadata;
@@ -39,34 +39,51 @@ import java.util.Objects;
 public class ClickHouseJdbcUtils {
 
     private static final String CLICKHOUSE_DRIVER_CLASS = 
"ru.yandex.clickhouse.ClickHouseDriver";
-    private static final String METADATA_TYPE = "TABLE";
     private static final String COLUMN_LABEL = "TABLE_NAME";
-    private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse";
+    private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse://";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseJdbcUtils.class);
 
     /**
-     * Get ClickHouse connection from clickhouse url and user
+     * Get ClickHouse connection from ClickHouse URL and user.
+     *
+     * @param url      JDBC URL, such as jdbc:clickhouse://host:port/database
+     * @param user     Username for JDBC URL
+     * @param password User password
+     * @return {@link Connection}
+     * @throws Exception on get connection error
      */
     public static Connection getConnection(String url, String user, String 
password) throws Exception {
-        if (StringUtils.isBlank(url) || 
!url.startsWith(CLICKHOUSE_JDBC_PREFIX)) {
-            throw new Exception("ClickHouse server URL was invalid, it should 
start with jdbc:clickhouse");
-        }
+        // Non-empty validation
+        UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, 
CLICKHOUSE_JDBC_PREFIX);
+        Connection conn = establishConnection(url, user, password);
+        return conn;
+    }
+
+    /**
+     * Establishes a ClickHouse JDBC connection using the provided URL, 
username, and password.
+     *
+     * @param url      The ClickHouse JDBC URL
+     * @param user     The username
+     * @param password The user's password
+     * @return A {@link Connection} object representing the ClickHouse 
database connection
+     * @throws Exception If an error occurs while obtaining the connection
+     */
+    private static Connection establishConnection(String url, String user, 
String password) throws Exception {
         Connection conn;
         try {
             Class.forName(CLICKHOUSE_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
         } catch (Exception e) {
-            LOG.error("get clickhouse connection error, please check 
clickhouse jdbc url, username or password", e);
-            throw new Exception("get clickhouse connection error, please check 
jdbc url, username or password. "
+            LOG.error("get ClickHouse connection error, please check 
ClickHouse JDBC URL, username or password", e);
+            throw new Exception("get ClickHouse connection error, please check 
JDBC URL, username or password. "
                     + "other error msg: " + e.getMessage());
         }
 
         if (conn == null) {
-            throw new Exception("get clickhouse connection failed, please 
contact administrator");
+            throw new Exception("get ClickHouse connection failed, please 
contact administrator");
         }
-
-        LOG.info("get clickhouse connection success, url={}", url);
+        LOG.info("get ClickHouse connection success, url={}", url);
         return conn;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
index f63bbddf18..df27394a1f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.service.resource.sink.hive;
 
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
 import org.apache.inlong.manager.pojo.sink.hive.HiveColumnInfo;
 import org.apache.inlong.manager.pojo.sink.hive.HiveTableInfo;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hive.jdbc.HiveDatabaseMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,28 +40,50 @@ public class HiveJdbcUtils {
     private static final String HIVE_DRIVER_CLASS = 
"org.apache.hive.jdbc.HiveDriver";
     private static final String METADATA_TYPE = "TABLE";
     private static final String COLUMN_LABEL = "TABLE_NAME";
-    private static final String HIVE_JDBC_PREFIX = "jdbc:hive2";
+    private static final String HIVE_JDBC_PREFIX = "jdbc:hive2://";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveJdbcUtils.class);
 
     /**
-     * Get Hive connection from hive url and user
+     * Get Hive connection from Hive URL and user.
+     *
+     * @param url      JDBC URL, such as jdbc:hive2://host:port/database
+     * @param user     Username for JDBC URL
+     * @param password User password
+     * @return {@link Connection}
+     * @throws Exception on get connection error
      */
     public static Connection getConnection(String url, String user, String 
password) throws Exception {
-        if (StringUtils.isBlank(url) || !url.startsWith(HIVE_JDBC_PREFIX)) {
-            throw new Exception("hive server url should start with " + 
HIVE_JDBC_PREFIX);
-        }
+        UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, 
HIVE_JDBC_PREFIX);
+        return createConnection(url, user, password);
+    }
+
+    /**
+     * Creates a Hive JDBC connection using the provided URL, username, and 
password.
+     *
+     * @param url      The Hive JDBC URL
+     * @param user     The username
+     * @param password The user's password
+     * @return A {@link Connection} object representing the Hive database 
connection
+     * @throws Exception If an error occurs while obtaining the connection
+     */
+    private static Connection createConnection(String url, String user, String 
password) throws Exception {
         Connection conn;
         try {
             Class.forName(HIVE_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
-            LOGGER.info("get hive connection success, url={}", url);
-            return conn;
         } catch (Exception e) {
-            String errMsg = "get hive connection error, please check hive jdbc 
url, username or password";
-            LOGGER.error(errMsg, e);
-            throw new Exception(errMsg + ", error: " + e.getMessage());
+            String errorMsg = "Failed to get Hive connection, please check 
Hive JDBC URL, username, or password!";
+            LOGGER.error(errorMsg, e);
+            throw new Exception(errorMsg + " Other error message: " + 
e.getMessage());
         }
+
+        if (conn == null) {
+            throw new Exception("Failed to get Hive connection, please contact 
the administrator.");
+        }
+
+        LOGGER.info("Successfully obtained Hive connection for URL: {}", url);
+        return conn;
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
index 7c0e6483bc..d4cc922797 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.manager.service.resource.sink.mysql;
 
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +38,7 @@ import java.util.Objects;
  */
 public class MySQLJdbcUtils {
 
-    private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql";
+    private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
     private static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MySQLJdbcUtils.class);
 
@@ -52,21 +52,29 @@ public class MySQLJdbcUtils {
      * @throws Exception on get connection error
      */
     public static Connection getConnection(String url, String user, String 
password) throws Exception {
-        if (StringUtils.isBlank(url) || !url.startsWith(MYSQL_JDBC_PREFIX)) {
-            throw new Exception("MySQL JDBC URL was invalid, it should start 
with jdbc:mysql");
-        }
+        UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, 
MYSQL_JDBC_PREFIX);
+        Connection conn = establishDatabaseConnection(url, user, password);
+        return conn;
+    }
 
+    /**
+     * Establishes a database connection using the provided URL, username, and 
password.
+     *
+     * @param url      The JDBC URL
+     * @param user     The username
+     * @param password The user's password
+     * @return A {@link Connection} object representing the database connection
+     * @throws Exception If an error occurs while obtaining the connection
+     */
+    private static Connection establishDatabaseConnection(String url, String 
user, String password) throws Exception {
         Connection conn;
         try {
             Class.forName(MYSQL_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
         } catch (Exception e) {
-            String errorMsg = "get MySQL connection error, please check MySQL 
JDBC url, username or password!";
+            String errorMsg = "Failed to get MySQL connection, please check 
MySQL JDBC URL, username, or password!";
             LOGGER.error(errorMsg, e);
-            throw new Exception(errorMsg + " other error msg: " + 
e.getMessage());
-        }
-        if (Objects.isNull(conn)) {
-            throw new Exception("get MySQL connection failed, please contact 
administrator.");
+            throw new Exception(errorMsg + " Other error message: " + 
e.getMessage());
         }
         LOGGER.info("get MySQL connection success for url={}", url);
         return conn;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
index 4ff3508389..790d1ddff4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.service.resource.sink.postgresql;
 
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLTableInfo;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,7 @@ public class PostgreSQLJdbcUtils {
 
     private static final String POSTGRES_DRIVER_CLASS = 
"org.postgresql.Driver";
 
-    private static final String POSTGRES_JDBC_PREFIX = "jdbc:postgresql";
+    private static final String POSTGRES_JDBC_PREFIX = "jdbc:postgresql://";
 
     private static final String POSTGRESQL_DEFAULT_SCHEMA = "public";
 
@@ -54,20 +54,32 @@ public class PostgreSQLJdbcUtils {
      * @return {@link Connection}
      * @throws Exception on get connection error
      */
-    public static Connection getConnection(String url, String user, String 
password)
-            throws Exception {
-        if (StringUtils.isBlank(url) || !url.startsWith(POSTGRES_JDBC_PREFIX)) 
{
-            throw new Exception("PostgreSQL server URL was invalid, it should 
start with jdbc:postgresql");
-        }
+    public static Connection getConnection(String url, String user, String 
password) throws Exception {
+        UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, 
POSTGRES_JDBC_PREFIX);
+        return establishDatabaseConnection(url, user, password);
+    }
+
+    /**
+     * Establishes a PostgreSQL database connection using the provided URL, 
username, and password.
+     *
+     * @param url      The PostgreSQL JDBC URL
+     * @param user     The username
+     * @param password The user's password
+     * @return A {@link Connection} representing the PostgreSQL database 
connection
+     * @throws Exception If an error occurs during connection establishment
+     */
+    private static Connection establishDatabaseConnection(String url, String 
user, String password) throws Exception {
         Connection conn;
         try {
             Class.forName(POSTGRES_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
         } catch (Exception e) {
-            String errorMsg = "get PostgreSQL connection error, please check 
postgresql jdbc url, username or password";
+            String errorMsg =
+                    "get PostgreSQL connection error, please check PostgreSQL 
JDBC URL, username, or password";
             LOG.error(errorMsg, e);
             throw new Exception(errorMsg + ": " + e.getMessage());
         }
+
         if (conn == null) {
             throw new Exception("get PostgreSQL connection failed, please 
contact administrator");
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
index 5625701c3c..c0f0bc80ff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.manager.service.resource.sink.starrocks;
 
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hive.jdbc.HiveDatabaseMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,30 +39,34 @@ public class StarRocksJdbcUtils {
     private static final String STAR_ROCKS_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
     private static final String METADATA_TYPE = "TABLE";
     private static final String COLUMN_LABEL = "TABLE_NAME";
-    private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql";
+    private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql://";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StarRocksJdbcUtils.class);
 
     /**
-     * Get starRocks connection from starRocks url and user
+     * Get a StarRocks JDBC connection using the provided URL, username, and 
password.
+     *
+     * @param url      The StarRocks JDBC URL.
+     * @param user     The username for authentication.
+     * @param password The password for authentication.
+     * @return A {@link Connection} representing the StarRocks database 
connection.
+     * @throws Exception If an error occurs during connection establishment.
      */
     public static Connection getConnection(String url, String user, String 
password) throws Exception {
-        if (StringUtils.isBlank(url) || 
!url.startsWith(STAR_ROCKS_JDBC_PREFIX)) {
-            throw new Exception("starRocks server url should start with " + 
STAR_ROCKS_JDBC_PREFIX);
-        }
+        UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, 
STAR_ROCKS_JDBC_PREFIX);
         Connection conn;
         try {
             Class.forName(STAR_ROCKS_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
-            LOGGER.info("get star rocks connection success, url={}", url);
+            LOGGER.info("Successfully obtained StarRocks connection, URL: {}", 
url);
             return conn;
         } catch (Exception e) {
-            String errMsg = "get star rocks connection error, please check 
starRocks jdbc url, username or password";
+            String errMsg =
+                    "Failed to get StarRocks connection, please check 
StarRocks JDBC URL, username, or password.";
             LOGGER.error(errMsg, e);
             throw new Exception(errMsg + ", error: " + e.getMessage());
         }
     }
-
     /**
      * Execute sql on the specified starRocks Server
      *
diff --git 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
index dec8768c15..8e4c346e53 100644
--- 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
+++ 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
@@ -43,6 +43,8 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.util.CollectionUtils;
@@ -54,6 +56,7 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -66,6 +69,7 @@ public class ClusterController {
 
     private final Gson gson = new Gson();
     private final TubeMQResult result = new TubeMQResult();
+    private static final Logger LOGGER = 
LogManager.getLogger(ClusterController.class);
 
     @Autowired
     private ClusterService clusterService;
@@ -73,18 +77,74 @@ public class ClusterController {
     @Autowired
     private MasterService masterService;
 
+    /**
+     * Handles cluster-related requests based on the specified method.
+     *
+     * @param method The method to perform (e.g., "add", "delete", "modify", 
"switch")
+     * @param req    The request body containing relevant data
+     * @return A TubeMQResult indicating the result of the operation
+     */
     @PostMapping("")
     public @ResponseBody TubeMQResult clusterMethodProxy(@RequestParam String 
method, @RequestBody String req) {
+        if (!isValidMethod(method)) {
+            LOGGER.warn("Invalid method value received: {}", method);
+            return TubeMQResult.errorResult("Invalid method value.");
+        }
+
+        if (!isValidURL(req)) {
+            LOGGER.warn("Invalid URL format received: {}", req);
+            return TubeMQResult.errorResult("Invalid URL format.");
+        }
+
+        return processClusterRequest(method, req);
+    }
+
+    /**
+     * Validates if the provided URL is in a valid format.
+     *
+     * @param url The URL to validate
+     * @return True if the URL is valid, otherwise false
+     */
+    private static boolean isValidURL(String url) {
+        String urlPattern = "^(https?|ftp)://[a-zA-Z0-9.-]+(/.*)?$";
+        return url.matches(urlPattern);
+    }
+
+    /**
+     * Validates if the provided method is one of the allowed methods.
+     *
+     * @param method The method to validate
+     * @return True if the method is valid, otherwise false
+     */
+    private static boolean isValidMethod(String method) {
+        List<String> allowedMethods =
+                Arrays.asList(TubeConst.ADD, TubeConst.DELETE, 
TubeConst.MODIFY, TubeConst.SWITCH);
+        return allowedMethods.contains(method);
+    }
+
+    /**
+     * Processes the cluster-related request based on the specified method.
+     *
+     * @param method The method to perform (e.g., "add", "delete", "modify", 
"switch")
+     * @param req    The request body containing relevant data
+     * @return A TubeMQResult indicating the result of the operation
+     */
+    private TubeMQResult processClusterRequest(String method, String req) {
         switch (method) {
             case TubeConst.ADD:
+                LOGGER.info("Received 'add' operation with URL: {}", req);
                 return addNewCluster(gson.fromJson(req, AddClusterReq.class));
             case TubeConst.DELETE:
+                LOGGER.info("Received 'delete' operation with URL: {}", req);
                 return deleteCluster(gson.fromJson(req, 
DeleteClusterReq.class));
             case TubeConst.MODIFY:
+                LOGGER.info("Received 'modify' operation with URL: {}", req);
                 return changeCluster(gson.fromJson(req, ClusterDto.class));
             case TubeConst.SWITCH:
+                LOGGER.info("Received 'switch' operation with URL: {}", req);
                 return masterService.baseRequestMaster(gson.fromJson(req, 
SwitchClusterReq.class));
             default:
+                LOGGER.warn("Received unknown method: {}", method);
                 return 
TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_METHOD);
         }
     }
diff --git 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
index f90dd21167..655f43952c 100644
--- 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
+++ 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
@@ -33,7 +33,10 @@ import 
org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
 import org.apache.inlong.tubemq.manager.service.interfaces.TopicService;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -42,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 @RestController
@@ -49,10 +54,12 @@ import java.util.Map;
 @Slf4j
 public class TopicWebController {
 
+    private static final Logger LOGGER = 
LogManager.getLogger(TopicWebController.class);
+
     @Autowired
     private NodeService nodeService;
 
-    private Gson gson = new Gson();
+    private static final Gson gson = new Gson();
 
     @Autowired
     private MasterService masterService;
@@ -61,47 +68,189 @@ public class TopicWebController {
     private TopicService topicService;
 
     /**
-     * broker method proxy
-     * divides the operation on broker to different method
+     * Broker method proxy.
+     * Divides the operation on the broker into different methods.
      */
     @RequestMapping(value = "")
     public @ResponseBody TubeMQResult topicMethodProxy(@RequestParam String 
method, @RequestBody String req)
             throws Exception {
+        // Log audit: Record the received method and req parameters
+        LOGGER.info("Received method for topicMethodProxy: {}", method);
+        LOGGER.info("Received req for topicMethodProxy: {}", req);
+
+        // Validate the 'method' parameter
+        if (!isValidMethod(method)) {
+            return handleInvalidMethod(method);
+        }
+
+        // Validate the 'req' parameter
+        if (!isValidJson(req)) {
+            return handleInvalidJson(req);
+        }
+
+        // Perform processing based on the 'method' parameter
         switch (method) {
             case TubeConst.ADD:
-                return masterService.baseRequestMaster(gson.fromJson(req, 
BatchAddTopicReq.class));
+                return handleAddTopicRequest(req);
             case TubeConst.CLONE:
-                return nodeService.cloneTopicToBrokers(gson.fromJson(req, 
CloneTopicReq.class));
+                return handleCloneTopicRequest(req);
             case TubeConst.AUTH_CONTROL:
-                return setAuthControl(gson.fromJson(req, 
SetAuthControlReq.class));
+                return handleAuthControlRequest(req);
             case TubeConst.MODIFY:
-                return masterService.baseRequestMaster(gson.fromJson(req, 
ModifyTopicReq.class));
+                return handleModifyTopicRequest(req);
             case TubeConst.DELETE:
             case TubeConst.REMOVE:
-                return masterService.baseRequestMaster(gson.fromJson(req, 
DeleteTopicReq.class));
+                return handleDeleteTopicRequest(req);
             case TubeConst.QUERY_CAN_WRITE:
-                return queryCanWrite(gson.fromJson(req, 
QueryCanWriteReq.class));
+                return handleQueryCanWriteRequest(req);
             case TubeConst.PUBLISH:
-                return masterService.baseRequestMaster(gson.fromJson(req, 
SetPublishReq.class));
+                return handlePublishRequest(req);
             case TubeConst.SUBSCRIBE:
-                return masterService.baseRequestMaster(gson.fromJson(req, 
SetSubscribeReq.class));
+                return handleSubscribeRequest(req);
             default:
-                return 
TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_METHOD);
+                return handleInvalidMethod(method);
         }
     }
 
-    private TubeMQResult setAuthControl(SetAuthControlReq req) {
-        req.setMethod(TubeConst.SET_AUTH_CONTROL);
-        req.setType(TubeConst.OP_MODIFY);
-        req.setCreateUser(TubeConst.TUBEADMIN);
-        return masterService.baseRequestMaster(req);
+    /**
+     * Handles invalid 'method' parameter.
+     *
+     * @param method The invalid method value
+     * @return TubeMQResult indicating an error
+     */
+    private TubeMQResult handleInvalidMethod(String method) {
+        LOGGER.warn("Invalid method value received: {}", method);
+        return TubeMQResult.errorResult("Invalid method value.");
+    }
+
+    /**
+     * Handles invalid JSON format in 'req' parameter.
+     *
+     * @param req The invalid JSON format
+     * @return TubeMQResult indicating an error
+     */
+    private TubeMQResult handleInvalidJson(String req) {
+        LOGGER.warn("Invalid JSON format received: {}", req);
+        return TubeMQResult.errorResult("Invalid JSON format.");
+    }
+
+    /**
+     * Handles 'ADD' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleAddTopicRequest(String req) {
+        return masterService.baseRequestMaster(gson.fromJson(req, 
BatchAddTopicReq.class));
+    }
+
+    /**
+     * Handles 'CLONE' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleCloneTopicRequest(String req) throws Exception {
+        return nodeService.cloneTopicToBrokers(gson.fromJson(req, 
CloneTopicReq.class));
     }
 
-    private TubeMQResult queryCanWrite(QueryCanWriteReq req) {
-        if (!req.legal()) {
+    /**
+     * Handles 'AUTH_CONTROL' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleAuthControlRequest(String req) {
+        SetAuthControlReq setAuthControlReq = gson.fromJson(req, 
SetAuthControlReq.class);
+        setAuthControlReq.setMethod(TubeConst.SET_AUTH_CONTROL);
+        setAuthControlReq.setType(TubeConst.OP_MODIFY);
+        setAuthControlReq.setCreateUser(TubeConst.TUBEADMIN);
+        return masterService.baseRequestMaster(setAuthControlReq);
+    }
+
+    /**
+     * Handles 'MODIFY' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleModifyTopicRequest(String req) {
+        return masterService.baseRequestMaster(gson.fromJson(req, 
ModifyTopicReq.class));
+    }
+
+    /**
+     * Handles 'DELETE' and 'REMOVE' method requests.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleDeleteTopicRequest(String req) {
+        return masterService.baseRequestMaster(gson.fromJson(req, 
DeleteTopicReq.class));
+    }
+
+    /**
+     * Handles 'QUERY_CAN_WRITE' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleQueryCanWriteRequest(String req) {
+        QueryCanWriteReq queryCanWriteReq = gson.fromJson(req, 
QueryCanWriteReq.class);
+        if (!queryCanWriteReq.legal()) {
             return TubeMQResult.errorResult(TubeMQErrorConst.PARAM_ILLEGAL);
         }
-        return topicService.queryCanWrite(req.getTopicName(), 
req.getClusterId());
+        return topicService.queryCanWrite(queryCanWriteReq.getTopicName(), 
queryCanWriteReq.getClusterId());
+    }
+
+    /**
+     * Handles 'PUBLISH' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handlePublishRequest(String req) {
+        return masterService.baseRequestMaster(gson.fromJson(req, 
SetPublishReq.class));
+    }
+
+    /**
+     * Handles 'SUBSCRIBE' method request.
+     *
+     * @param req The JSON request
+     * @return TubeMQResult based on the request
+     */
+    private TubeMQResult handleSubscribeRequest(String req) {
+        return masterService.baseRequestMaster(gson.fromJson(req, 
SetSubscribeReq.class));
+    }
+
+    /**
+     * Checks if the given method is valid by comparing it against a list of 
allowed methods.
+     *
+     * @param method The method to validate.
+     * @return {@code true} if the method is valid, {@code false} otherwise.
+     */
+    private static boolean isValidMethod(String method) {
+        // Define a list of allowed methods
+        List<String> allowedMethods = Arrays.asList(
+                TubeConst.ADD, TubeConst.CLONE, TubeConst.AUTH_CONTROL, 
TubeConst.MODIFY,
+                TubeConst.DELETE, TubeConst.REMOVE, TubeConst.QUERY_CAN_WRITE, 
TubeConst.PUBLISH, TubeConst.SUBSCRIBE);
+        return allowedMethods.contains(method);
+    }
+
+    /**
+     * Validates whether the given JSON string has a valid format by 
attempting to parse it.
+     *
+     * @param json The JSON string to validate.
+     * @return {@code true} if the JSON format is valid, {@code false} 
otherwise.
+     */
+    private static boolean isValidJson(String json) {
+        // Use a JSON library or parser to validate the JSON format
+        try {
+            gson.fromJson(json, Object.class);
+            return true;
+        } catch (JsonSyntaxException e) {
+            LOGGER.error("JSON validation failed with exception: {}", 
e.getMessage());
+            return false;
+        }
     }
 
     /**
diff --git 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
index f16ec6459e..d873c2e9cd 100644
--- 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
+++ 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
@@ -37,6 +37,10 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
@@ -54,11 +58,112 @@ public class MasterServiceImpl implements MasterService {
     @Autowired
     MasterRepository masterRepository;
 
+    /**
+     * Request the master using the given URL.
+     *
+     * @param url The URL to request.
+     * @return TubeMQResult representing the result of the request.
+     */
     @Override
     public TubeMQResult requestMaster(String url) {
-
         log.info("start to request {}", url);
+        long startTime = System.currentTimeMillis();
+
+        if (!isValidURL(url)) {
+            log.error("Invalid URL: {}", url);
+            logRequestDetails(url, startTime, "Invalid URL");
+            return TubeMQResult.errorResult("Invalid URL.");
+        }
+
+        String hostname = getHostnameFromURL(url);
+
+        if (!isValidHostname(hostname)) {
+            log.error("Invalid hostname: {}", hostname);
+            logRequestDetails(url, startTime, "Invalid hostname");
+            return TubeMQResult.errorResult("Invalid hostname.");
+        }
+
         HttpGet httpGet = new HttpGet(url);
+        return executeHttpRequest(httpGet, url, startTime);
+    }
+
+    /**
+     * Logs request details including URL, status, and duration.
+     *
+     * @param url     The URL being requested.
+     * @param startTime The start time of the request.
+     * @param status  The status of the request.
+     */
+    private void logRequestDetails(String url, long startTime, String status) {
+        long endTime = System.currentTimeMillis();
+        long duration = endTime - startTime;
+        log.info("Request Details - URL: {}, Status: {}, Duration: {} ms", 
url, status, duration);
+    }
+
+    /**
+     * Extracts the hostname from the given URL.
+     *
+     * @param url The URL from which to extract the hostname.
+     * @return The extracted hostname.
+     */
+    private String getHostnameFromURL(String url) {
+        try {
+            URL u = new URL(url);
+            return u.getHost();
+        } catch (MalformedURLException e) {
+            log.warn("Failed to extract hostname from URL: {}", 
e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     * Validates the hostname by performing DNS resolution.
+     *
+     * @param hostname The hostname to validate.
+     * @return true if the hostname is valid, false otherwise.
+     */
+    private boolean isValidHostname(String hostname) {
+        if (hostname == null) {
+            return false;
+        }
+        try {
+            InetAddress.getByName(hostname);
+            return true;
+        } catch (UnknownHostException e) {
+            log.error("DNS resolution failed for hostname: {}", hostname, e);
+            return false;
+        }
+    }
+
+    /**
+     * Validates the format of the URL and checks if it starts with "http" or 
"https".
+     *
+     * @param url The URL to validate.
+     * @return true if the URL is valid, false otherwise.
+     */
+    private boolean isValidURL(String url) {
+        try {
+            URL u = new URL(url);
+            String protocol = u.getProtocol().toLowerCase();
+            if ("http".equals(protocol) || "https".equals(protocol)) {
+                return true;
+            }
+        } catch (MalformedURLException e) {
+            log.warn("URL validation failed with exception: {}", 
e.getMessage());
+            return false;
+        }
+        return false;
+    }
+
+    /**
+     * Executes an HTTP request and returns the TubeMQResult.
+     *
+     * @param httpGet    The HttpGet request to execute.
+     * @param url        The URL being requested.
+     * @param startTime  The start time of the request.
+     * @return TubeMQResult representing the result of the HTTP request.
+     */
+    private TubeMQResult executeHttpRequest(HttpGet httpGet, String url, long 
startTime) {
         TubeMQResult defaultResult = new TubeMQResult();
 
         try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
@@ -67,13 +172,16 @@ public class MasterServiceImpl implements MasterService {
                             StandardCharsets.UTF_8), TubeHttpResponse.class);
             if (tubeResponse.getCode() == TubeConst.SUCCESS_CODE
                     && tubeResponse.getErrCode() == TubeConst.SUCCESS_CODE) {
+                logRequestDetails(url, startTime, "Success");
                 return defaultResult;
             } else {
                 defaultResult = errorResult(tubeResponse.getErrMsg());
+                logRequestDetails(url, startTime, "Failed: " + 
tubeResponse.getErrMsg());
             }
         } catch (Exception ex) {
             log.error("exception caught while requesting broker status", ex);
             defaultResult = TubeMQResult.errorResult(ex.getMessage());
+            logRequestDetails(url, startTime, "Exception: " + ex.getMessage());
         }
         return defaultResult;
     }
@@ -87,9 +195,22 @@ public class MasterServiceImpl implements MasterService {
     @Override
     public String queryMaster(String url) {
         log.info("start to request {}", url);
+
+        if (!isValidURL(url)) {
+            log.error("Invalid URL: {}", url);
+            return gson.toJson(TubeMQResult.errorResult("Invalid URL."));
+        }
+
         HttpGet httpGet = new HttpGet(url);
         TubeMQResult defaultResult = new TubeMQResult();
         try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+            // If the redirected URL is different from the original URL, 
perform further validation
+            String redirectedUrl = 
response.getHeaders("Location")[0].getValue();
+            if (!url.equals(redirectedUrl) && !isValidURL(redirectedUrl)) {
+                log.error("Invalid redirected URL: {}", redirectedUrl);
+                return gson.toJson(TubeMQResult.errorResult("Invalid 
redirected URL."));
+            }
+
             // return result json to response
             return EntityUtils.toString(response.getEntity());
         } catch (Exception ex) {
@@ -97,8 +218,8 @@ public class MasterServiceImpl implements MasterService {
             defaultResult.setErrCode(-1);
             defaultResult.setResult(false);
             defaultResult.setErrMsg(ex.getMessage());
+            return gson.toJson(defaultResult);
         }
-        return gson.toJson(defaultResult);
     }
 
     @Override
diff --git 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
index 3bfa6a1bb1..38a3dee833 100644
--- 
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
+++ 
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
@@ -55,9 +55,13 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * node service to query broker/master/standby status of tube cluster.
@@ -70,6 +74,11 @@ public class TopicServiceImpl implements TopicService {
     public static final int MINIMUN_TOPIC_RUN_PART = 1;
     private final CloseableHttpClient httpclient = HttpClients.createDefault();
     private final Gson gson = new Gson();
+    private static final Pattern SPECIAL_CHAR_PATTERN = 
Pattern.compile("[%\\x00-\\x1F\\x7F-\\uFFFF]");
+    private static final int MAX_TOPIC_NAME_LENGTH = 255;
+    private static final String[] DANGEROUS_KEYWORDS = {
+            "exec", "system", "cmd", "shell", "php", "perl", "python", "ruby", 
"javascript", "java"
+    };
 
     @Value("${manager.broker.webPort:8081}")
     private int brokerWebPort;
@@ -111,25 +120,140 @@ public class TopicServiceImpl implements TopicService {
         return TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_GROUP);
     }
 
+    /**
+     * Requests and retrieves topic view information from a TubeMQ cluster.
+     *
+     * @param clusterId  The ID of the TubeMQ cluster.
+     * @param topicName  The name of the topic to retrieve information for.
+     * @return           The TopicView object containing topic information.
+     * @throws IllegalArgumentException  If any validation checks fail.
+     * @throws RuntimeException          If an exception occurs during the 
request.
+     */
     @Override
     public TopicView requestTopicViewInfo(Long clusterId, String topicName) {
         MasterEntry masterNode = masterService.getMasterNode(clusterId);
-        String url = TubeConst.SCHEMA + masterNode.getIp() + ":" + 
masterNode.getWebPort()
-                + TubeConst.TOPIC_VIEW;
-        if (StringUtils.isNotBlank(topicName)) {
-            url = StringUtils.join(url, TubeConst.TOPIC_NAME, topicName);
-        }
+        validateMasterEntry(masterNode, clusterId);
+        validateTopicName(topicName, clusterId);
+
+        String url = buildTopicViewURL(masterNode, topicName, clusterId);
         HttpGet httpget = new HttpGet(url);
         try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            return gson.fromJson(new 
InputStreamReader(response.getEntity().getContent(),
-                    StandardCharsets.UTF_8),
-                    TopicView.class);
+            return parseTopicViewResponse(response);
         } catch (Exception ex) {
-            log.error("exception caught while requesting group status", ex);
+            handleRequestException(clusterId, topicName, ex);
             throw new RuntimeException(ex.getMessage());
         }
     }
 
+    /**
+     * Validates if the provided MasterEntry is valid.
+     *
+     * @param masterNode The MasterEntry to validate.
+     * @param clusterId  The ID of the TubeMQ cluster.
+     * @throws IllegalArgumentException If the MasterEntry is invalid.
+     */
+    private void validateMasterEntry(MasterEntry masterNode, Long clusterId) {
+        if (masterNode == null || StringUtils.isBlank(masterNode.getIp()) || 
masterNode.getWebPort() <= 0) {
+            log.error("Invalid MasterEntry: ClusterId = {}", clusterId);
+            throw new IllegalArgumentException("Invalid MasterEntry.");
+        }
+    }
+
+    /**
+     * Validates if the provided topic name is valid.
+     *
+     * @param topicName The topic name to validate.
+     * @param clusterId The ID of the TubeMQ cluster.
+     * @throws IllegalArgumentException If the topic name is invalid.
+     */
+    private void validateTopicName(String topicName, Long clusterId) {
+        if (StringUtils.isBlank(topicName) || containsDangerousChars(topicName)
+                || topicName.length() > MAX_TOPIC_NAME_LENGTH) {
+            log.error("Invalid topicName: ClusterId = {}, TopicName = {}", 
clusterId, topicName);
+            throw new IllegalArgumentException("Invalid topicName.");
+        }
+    }
+
+    /**
+     * Builds the URL for requesting topic view information.
+     *
+     * @param masterNode The MasterEntry representing the TubeMQ master node.
+     * @param topicName  The name of the topic.
+     * @param clusterId  The ID of the TubeMQ cluster.
+     * @return           The constructed URL.
+     * @throws IllegalArgumentException If the URL is invalid.
+     */
+    private String buildTopicViewURL(MasterEntry masterNode, String topicName, 
Long clusterId) {
+        String url = TubeConst.SCHEMA + masterNode.getIp() + ":" + 
masterNode.getWebPort() + TubeConst.TOPIC_VIEW;
+        if (!isValidURL(url)) {
+            log.error("Invalid URL: ClusterId = {}, URL = {}", clusterId, url);
+            throw new IllegalArgumentException("Invalid URL.");
+        }
+        return url + TubeConst.TOPIC_NAME + topicName;
+    }
+
+    /**
+     * Parses the response to obtain a TopicView object.
+     *
+     * @param response The HTTP response containing topic view information.
+     * @return         The parsed TopicView object.
+     * @throws Exception If an exception occurs during parsing.
+     */
+    private TopicView parseTopicViewResponse(CloseableHttpResponse response) 
throws Exception {
+        return gson.fromJson(new 
InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8),
+                TopicView.class);
+    }
+
+    /**
+     * Handles exceptions that occur during the request.
+     *
+     * @param clusterId The ID of the TubeMQ cluster.
+     * @param topicName The name of the topic.
+     * @param ex        The exception that occurred.
+     */
+    private void handleRequestException(Long clusterId, String topicName, 
Exception ex) {
+        log.error("Exception caught while requesting topic view: ClusterId = 
{}, TopicName = {}", clusterId, topicName,
+                ex);
+    }
+
+    /**
+     * Checks if the input string contains dangerous characters or keywords 
that may pose security risks,
+     * such as those commonly associated with SSRF attacks.
+     *
+     * @param input The input string to be checked for dangerous characters or 
keywords.
+     * @return True if the input contains dangerous characters or keywords, 
otherwise false.
+     */
+    private boolean containsDangerousChars(String input) {
+        input = input.toLowerCase();
+        // Prevent SSRF attacks by checking for "://"
+        if (input.contains("://")) {
+            return true;
+        }
+        // Check for other possible dangerous characters or keywords
+        if (StringUtils.containsAny(input, DANGEROUS_KEYWORDS)) {
+            return true;
+        }
+        // Check for encoding of special characters or escape characters
+        Matcher matcher = SPECIAL_CHAR_PATTERN.matcher(input);
+        return matcher.find();
+    }
+
+    /**
+     * Validates if the provided URL string is in a valid URL format.
+     *
+     * @param url The URL string to be validated.
+     * @return True if the URL is in a valid format, otherwise false.
+     */
+    private boolean isValidURL(String url) {
+        try {
+            new URL(url);
+            return true;
+        } catch (MalformedURLException e) {
+            log.warn("URL validation failed with exception: {}", 
e.getMessage());
+            return false;
+        }
+    }
+
     @Override
     public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
         MasterEntry master = 
masterService.getMasterNode(Long.valueOf(req.getClusterId()));

Reply via email to