This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 18c39ff593 [INLONG-8961][Manager] Server-side request forgery attack
prevention in some classes (#9012)
18c39ff593 is described below
commit 18c39ff593b1afb71063d85dc89adc4d589516cf
Author: sunrisefromdark <[email protected]>
AuthorDate: Wed Oct 11 22:03:53 2023 +0800
[INLONG-8961][Manager] Server-side request forgery attack prevention in
some classes (#9012)
---
.../manager/common/util/UrlVerificationUtils.java | 58 +++++++
.../pojo/sink/ck/ClickHouseSinkRequest.java | 4 +
.../pojo/sink/postgresql/PostgreSQLSinkDTO.java | 6 +
.../resource/sink/ck/ClickHouseJdbcUtils.java | 41 +++--
.../service/resource/sink/hive/HiveJdbcUtils.java | 44 +++--
.../resource/sink/mysql/MySQLJdbcUtils.java | 28 +--
.../sink/postgresql/PostgreSQLJdbcUtils.java | 28 ++-
.../sink/starrocks/StarRocksJdbcUtils.java | 22 ++-
.../controller/cluster/ClusterController.java | 60 +++++++
.../controller/topic/TopicWebController.java | 189 ++++++++++++++++++---
.../tubemq/manager/service/MasterServiceImpl.java | 125 +++++++++++++-
.../tubemq/manager/service/TopicServiceImpl.java | 142 +++++++++++++++-
12 files changed, 666 insertions(+), 81 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/UrlVerificationUtils.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/UrlVerificationUtils.java
new file mode 100644
index 0000000000..086a73002f
--- /dev/null
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/UrlVerificationUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+
+public class UrlVerificationUtils {
+
+ /**
+ * Extracts the hostname and validates the port from a JDBC URL with the
specified prefix.
+ *
+ * @param fullUrl The full JDBC URL to extract the hostname and port from
+ * @param prefix The expected prefix of the JDBC URL
+ * @throws Exception If the URL format is invalid or the port is invalid
+ */
+ public static void extractHostAndValidatePortFromJdbcUrl(String fullUrl,
String prefix) throws Exception {
+ if (!fullUrl.startsWith(prefix)) {
+ throw new Exception("Invalid JDBC URL, it should start with " +
prefix);
+ }
+ // Extract the host and port part after the prefix
+ String hostPortPart = fullUrl.substring(prefix.length());
+ String[] hostPortParts = hostPortPart.split(InlongConstants.SLASH);
+
+ if (hostPortParts.length < 1) {
+ throw new Exception("Invalid JDBC URL format");
+ }
+ String hostPort = hostPortParts[0];
+ String[] hostPortSplit = hostPort.split(InlongConstants.COLON);
+ if (hostPortSplit.length != 2) {
+ throw new Exception("Invalid host:port format in JDBC URL");
+ }
+
+ String portStr = hostPortSplit[1];
+ try {
+ int portNumber = Integer.parseInt(portStr);
+ if (portNumber < 1 || portNumber > 65535) {
+ throw new Exception("Invalid port number in JDBC URL");
+ }
+ } catch (NumberFormatException e) {
+ throw new Exception("Invalid port number format in JDBC URL");
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
index e442383b1d..8f27349fba 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.Pattern;
@@ -40,13 +41,16 @@ import javax.validation.constraints.Pattern;
public class ClickHouseSinkRequest extends SinkRequest {
@ApiModelProperty("JDBC URL of the ClickHouse server")
+ @Length(max = 512, message = "length must be less than or equal to 512")
@Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
private String jdbcUrl;
@ApiModelProperty("Username of the ClickHouse server")
+ @Length(max = 128, message = "length must be less than or equal to 128")
private String username;
@ApiModelProperty("User password of the ClickHouse server")
+ @Length(max = 512, message = "length must be less than or equal to 512")
private String password;
@ApiModelProperty("Target database name")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index 46b64a0e8c..10ec7bbc18 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -29,8 +29,10 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
+import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -46,12 +48,16 @@ import java.util.Map;
public class PostgreSQLSinkDTO {
@ApiModelProperty("JDBC URL of the PostgreSQL server")
+ @Length(max = 512, message = "length must be less than or equal to 512")
+ @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
private String jdbcUrl;
@ApiModelProperty("Username for JDBC URL")
+ @Length(max = 128, message = "length must be less than or equal to 128")
private String username;
@ApiModelProperty("User password")
+ @Length(max = 512, message = "length must be less than or equal to 512")
private String password;
@ApiModelProperty("Target database name")
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
index e66f502977..4c3148a107 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.service.resource.sink.ck;
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseDatabaseMetadata;
@@ -39,34 +39,51 @@ import java.util.Objects;
public class ClickHouseJdbcUtils {
private static final String CLICKHOUSE_DRIVER_CLASS =
"ru.yandex.clickhouse.ClickHouseDriver";
- private static final String METADATA_TYPE = "TABLE";
private static final String COLUMN_LABEL = "TABLE_NAME";
- private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse";
+ private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse://";
private static final Logger LOG =
LoggerFactory.getLogger(ClickHouseJdbcUtils.class);
/**
- * Get ClickHouse connection from clickhouse url and user
+ * Get ClickHouse connection from ClickHouse URL and user.
+ *
+ * @param url JDBC URL, such as jdbc:clickhouse://host:port/database
+ * @param user Username for JDBC URL
+ * @param password User password
+ * @return {@link Connection}
+ * @throws Exception on get connection error
*/
public static Connection getConnection(String url, String user, String
password) throws Exception {
- if (StringUtils.isBlank(url) ||
!url.startsWith(CLICKHOUSE_JDBC_PREFIX)) {
- throw new Exception("ClickHouse server URL was invalid, it should
start with jdbc:clickhouse");
- }
+ // Non-empty validation
+ UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url,
CLICKHOUSE_JDBC_PREFIX);
+ Connection conn = establishConnection(url, user, password);
+ return conn;
+ }
+
+ /**
+ * Establishes a ClickHouse JDBC connection using the provided URL,
username, and password.
+ *
+ * @param url The ClickHouse JDBC URL
+ * @param user The username
+ * @param password The user's password
+ * @return A {@link Connection} object representing the ClickHouse
database connection
+ * @throws Exception If an error occurs while obtaining the connection
+ */
+ private static Connection establishConnection(String url, String user,
String password) throws Exception {
Connection conn;
try {
Class.forName(CLICKHOUSE_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
- LOG.error("get clickhouse connection error, please check
clickhouse jdbc url, username or password", e);
- throw new Exception("get clickhouse connection error, please check
jdbc url, username or password. "
+ LOG.error("get ClickHouse connection error, please check
ClickHouse JDBC URL, username or password", e);
+ throw new Exception("get ClickHouse connection error, please check
JDBC URL, username or password. "
+ "other error msg: " + e.getMessage());
}
if (conn == null) {
- throw new Exception("get clickhouse connection failed, please
contact administrator");
+ throw new Exception("get ClickHouse connection failed, please
contact administrator");
}
-
- LOG.info("get clickhouse connection success, url={}", url);
+ LOG.info("get ClickHouse connection success, url={}", url);
return conn;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
index f63bbddf18..df27394a1f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveJdbcUtils.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.service.resource.sink.hive;
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
import org.apache.inlong.manager.pojo.sink.hive.HiveColumnInfo;
import org.apache.inlong.manager.pojo.sink.hive.HiveTableInfo;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hive.jdbc.HiveDatabaseMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,28 +40,50 @@ public class HiveJdbcUtils {
private static final String HIVE_DRIVER_CLASS =
"org.apache.hive.jdbc.HiveDriver";
private static final String METADATA_TYPE = "TABLE";
private static final String COLUMN_LABEL = "TABLE_NAME";
- private static final String HIVE_JDBC_PREFIX = "jdbc:hive2";
+ private static final String HIVE_JDBC_PREFIX = "jdbc:hive2://";
private static final Logger LOGGER =
LoggerFactory.getLogger(HiveJdbcUtils.class);
/**
- * Get Hive connection from hive url and user
+ * Get Hive connection from Hive URL and user.
+ *
+ * @param url JDBC URL, such as jdbc:hive2://host:port/database
+ * @param user Username for JDBC URL
+ * @param password User password
+ * @return {@link Connection}
+ * @throws Exception on get connection error
*/
public static Connection getConnection(String url, String user, String
password) throws Exception {
- if (StringUtils.isBlank(url) || !url.startsWith(HIVE_JDBC_PREFIX)) {
- throw new Exception("hive server url should start with " +
HIVE_JDBC_PREFIX);
- }
+ UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url,
HIVE_JDBC_PREFIX);
+ return createConnection(url, user, password);
+ }
+
+ /**
+ * Creates a Hive JDBC connection using the provided URL, username, and
password.
+ *
+ * @param url The Hive JDBC URL
+ * @param user The username
+ * @param password The user's password
+ * @return A {@link Connection} object representing the Hive database
connection
+ * @throws Exception If an error occurs while obtaining the connection
+ */
+ private static Connection createConnection(String url, String user, String
password) throws Exception {
Connection conn;
try {
Class.forName(HIVE_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
- LOGGER.info("get hive connection success, url={}", url);
- return conn;
} catch (Exception e) {
- String errMsg = "get hive connection error, please check hive jdbc
url, username or password";
- LOGGER.error(errMsg, e);
- throw new Exception(errMsg + ", error: " + e.getMessage());
+ String errorMsg = "Failed to get Hive connection, please check
Hive JDBC URL, username, or password!";
+ LOGGER.error(errorMsg, e);
+ throw new Exception(errorMsg + " Other error message: " +
e.getMessage());
}
+
+ if (conn == null) {
+ throw new Exception("Failed to get Hive connection, please contact
the administrator.");
+ }
+
+ LOGGER.info("Successfully obtained Hive connection for URL: {}", url);
+ return conn;
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
index 7c0e6483bc..d4cc922797 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
@@ -17,11 +17,11 @@
package org.apache.inlong.manager.service.resource.sink.mysql;
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ import java.util.Objects;
*/
public class MySQLJdbcUtils {
- private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql";
+ private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
private static final String MYSQL_DRIVER_CLASS =
"com.mysql.cj.jdbc.Driver";
private static final Logger LOGGER =
LoggerFactory.getLogger(MySQLJdbcUtils.class);
@@ -52,21 +52,29 @@ public class MySQLJdbcUtils {
* @throws Exception on get connection error
*/
public static Connection getConnection(String url, String user, String
password) throws Exception {
- if (StringUtils.isBlank(url) || !url.startsWith(MYSQL_JDBC_PREFIX)) {
- throw new Exception("MySQL JDBC URL was invalid, it should start
with jdbc:mysql");
- }
+ UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url,
MYSQL_JDBC_PREFIX);
+ Connection conn = establishDatabaseConnection(url, user, password);
+ return conn;
+ }
+ /**
+ * Establishes a database connection using the provided URL, username, and
password.
+ *
+ * @param url The JDBC URL
+ * @param user The username
+ * @param password The user's password
+ * @return A {@link Connection} object representing the database connection
+ * @throws Exception If an error occurs while obtaining the connection
+ */
+ private static Connection establishDatabaseConnection(String url, String
user, String password) throws Exception {
Connection conn;
try {
Class.forName(MYSQL_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
- String errorMsg = "get MySQL connection error, please check MySQL
JDBC url, username or password!";
+ String errorMsg = "Failed to get MySQL connection, please check
MySQL JDBC URL, username, or password!";
LOGGER.error(errorMsg, e);
- throw new Exception(errorMsg + " other error msg: " +
e.getMessage());
- }
- if (Objects.isNull(conn)) {
- throw new Exception("get MySQL connection failed, please contact
administrator.");
+ throw new Exception(errorMsg + " Other error message: " +
e.getMessage());
}
LOGGER.info("get MySQL connection success for url={}", url);
return conn;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
index 4ff3508389..790d1ddff4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/postgresql/PostgreSQLJdbcUtils.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.service.resource.sink.postgresql;
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLColumnInfo;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLTableInfo;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +39,7 @@ public class PostgreSQLJdbcUtils {
private static final String POSTGRES_DRIVER_CLASS =
"org.postgresql.Driver";
- private static final String POSTGRES_JDBC_PREFIX = "jdbc:postgresql";
+ private static final String POSTGRES_JDBC_PREFIX = "jdbc:postgresql://";
private static final String POSTGRESQL_DEFAULT_SCHEMA = "public";
@@ -54,20 +54,32 @@ public class PostgreSQLJdbcUtils {
* @return {@link Connection}
* @throws Exception on get connection error
*/
- public static Connection getConnection(String url, String user, String
password)
- throws Exception {
- if (StringUtils.isBlank(url) || !url.startsWith(POSTGRES_JDBC_PREFIX))
{
- throw new Exception("PostgreSQL server URL was invalid, it should
start with jdbc:postgresql");
- }
+ public static Connection getConnection(String url, String user, String
password) throws Exception {
+ UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url,
POSTGRES_JDBC_PREFIX);
+ return establishDatabaseConnection(url, user, password);
+ }
+
+ /**
+ * Establishes a PostgreSQL database connection using the provided URL,
username, and password.
+ *
+ * @param url The PostgreSQL JDBC URL
+ * @param user The username
+ * @param password The user's password
+ * @return A {@link Connection} representing the PostgreSQL database
connection
+ * @throws Exception If an error occurs during connection establishment
+ */
+ private static Connection establishDatabaseConnection(String url, String
user, String password) throws Exception {
Connection conn;
try {
Class.forName(POSTGRES_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
- String errorMsg = "get PostgreSQL connection error, please check
postgresql jdbc url, username or password";
+ String errorMsg =
+ "get PostgreSQL connection error, please check PostgreSQL
JDBC URL, username, or password";
LOG.error(errorMsg, e);
throw new Exception(errorMsg + ": " + e.getMessage());
}
+
if (conn == null) {
throw new Exception("get PostgreSQL connection failed, please
contact administrator");
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
index 5625701c3c..c0f0bc80ff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
@@ -17,11 +17,11 @@
package org.apache.inlong.manager.service.resource.sink.starrocks;
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hive.jdbc.HiveDatabaseMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,30 +39,34 @@ public class StarRocksJdbcUtils {
private static final String STAR_ROCKS_DRIVER_CLASS =
"com.mysql.cj.jdbc.Driver";
private static final String METADATA_TYPE = "TABLE";
private static final String COLUMN_LABEL = "TABLE_NAME";
- private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql";
+ private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql://";
private static final Logger LOGGER =
LoggerFactory.getLogger(StarRocksJdbcUtils.class);
/**
- * Get starRocks connection from starRocks url and user
+ * Get a StarRocks JDBC connection using the provided URL, username, and
password.
+ *
+ * @param url The StarRocks JDBC URL.
+ * @param user The username for authentication.
+ * @param password The password for authentication.
+ * @return A {@link Connection} representing the StarRocks database
connection.
+ * @throws Exception If an error occurs during connection establishment.
*/
public static Connection getConnection(String url, String user, String
password) throws Exception {
- if (StringUtils.isBlank(url) ||
!url.startsWith(STAR_ROCKS_JDBC_PREFIX)) {
- throw new Exception("starRocks server url should start with " +
STAR_ROCKS_JDBC_PREFIX);
- }
+ UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url,
STAR_ROCKS_JDBC_PREFIX);
Connection conn;
try {
Class.forName(STAR_ROCKS_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
- LOGGER.info("get star rocks connection success, url={}", url);
+ LOGGER.info("Successfully obtained StarRocks connection, URL: {}",
url);
return conn;
} catch (Exception e) {
- String errMsg = "get star rocks connection error, please check
starRocks jdbc url, username or password";
+ String errMsg =
+ "Failed to get StarRocks connection, please check
StarRocks JDBC URL, username, or password.";
LOGGER.error(errMsg, e);
throw new Exception(errMsg + ", error: " + e.getMessage());
}
}
-
/**
* Execute sql on the specified starRocks Server
*
diff --git
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
index dec8768c15..8e4c346e53 100644
---
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
+++
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/ClusterController.java
@@ -43,6 +43,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.util.CollectionUtils;
@@ -54,6 +56,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -66,6 +69,7 @@ public class ClusterController {
private final Gson gson = new Gson();
private final TubeMQResult result = new TubeMQResult();
+ private static final Logger LOGGER =
LogManager.getLogger(ClusterController.class);
@Autowired
private ClusterService clusterService;
@@ -73,18 +77,74 @@ public class ClusterController {
@Autowired
private MasterService masterService;
+ /**
+ * Handles cluster-related requests based on the specified method.
+ *
+ * @param method The method to perform (e.g., "add", "delete", "modify",
"switch")
+ * @param req The request body containing relevant data
+ * @return A TubeMQResult indicating the result of the operation
+ */
@PostMapping("")
public @ResponseBody TubeMQResult clusterMethodProxy(@RequestParam String
method, @RequestBody String req) {
+ if (!isValidMethod(method)) {
+ LOGGER.warn("Invalid method value received: {}", method);
+ return TubeMQResult.errorResult("Invalid method value.");
+ }
+
+ if (!isValidURL(req)) {
+ LOGGER.warn("Invalid URL format received: {}", req);
+ return TubeMQResult.errorResult("Invalid URL format.");
+ }
+
+ return processClusterRequest(method, req);
+ }
+
+ /**
+ * Validates if the provided URL is in a valid format.
+ *
+ * @param url The URL to validate
+ * @return True if the URL is valid, otherwise false
+ */
+ private static boolean isValidURL(String url) {
+ String urlPattern = "^(https?|ftp)://[a-zA-Z0-9.-]+(/.*)?$";
+ return url.matches(urlPattern);
+ }
+
+ /**
+ * Validates if the provided method is one of the allowed methods.
+ *
+ * @param method The method to validate
+ * @return True if the method is valid, otherwise false
+ */
+ private static boolean isValidMethod(String method) {
+ List<String> allowedMethods =
+ Arrays.asList(TubeConst.ADD, TubeConst.DELETE,
TubeConst.MODIFY, TubeConst.SWITCH);
+ return allowedMethods.contains(method);
+ }
+
+ /**
+ * Processes the cluster-related request based on the specified method.
+ *
+ * @param method The method to perform (e.g., "add", "delete", "modify",
"switch")
+ * @param req The request body containing relevant data
+ * @return A TubeMQResult indicating the result of the operation
+ */
+ private TubeMQResult processClusterRequest(String method, String req) {
switch (method) {
case TubeConst.ADD:
+ LOGGER.info("Received 'add' operation with URL: {}", req);
return addNewCluster(gson.fromJson(req, AddClusterReq.class));
case TubeConst.DELETE:
+ LOGGER.info("Received 'delete' operation with URL: {}", req);
return deleteCluster(gson.fromJson(req,
DeleteClusterReq.class));
case TubeConst.MODIFY:
+ LOGGER.info("Received 'modify' operation with URL: {}", req);
return changeCluster(gson.fromJson(req, ClusterDto.class));
case TubeConst.SWITCH:
+ LOGGER.info("Received 'switch' operation with URL: {}", req);
return masterService.baseRequestMaster(gson.fromJson(req,
SwitchClusterReq.class));
default:
+ LOGGER.warn("Received unknown method: {}", method);
return
TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_METHOD);
}
}
diff --git
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
index f90dd21167..655f43952c 100644
---
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
+++
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/topic/TopicWebController.java
@@ -33,7 +33,10 @@ import
org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
import org.apache.inlong.tubemq.manager.service.interfaces.TopicService;
import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -42,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
@RestController
@@ -49,10 +54,12 @@ import java.util.Map;
@Slf4j
public class TopicWebController {
+ private static final Logger LOGGER =
LogManager.getLogger(TopicWebController.class);
+
@Autowired
private NodeService nodeService;
- private Gson gson = new Gson();
+ private static final Gson gson = new Gson();
@Autowired
private MasterService masterService;
@@ -61,47 +68,189 @@ public class TopicWebController {
private TopicService topicService;
/**
- * broker method proxy
- * divides the operation on broker to different method
+ * Broker method proxy.
+ * Divides the operation on the broker into different methods.
*/
@RequestMapping(value = "")
public @ResponseBody TubeMQResult topicMethodProxy(@RequestParam String
method, @RequestBody String req)
throws Exception {
+ // Log audit: Record the received method and req parameters
+ LOGGER.info("Received method for topicMethodProxy: {}", method);
+ LOGGER.info("Received req for topicMethodProxy: {}", req);
+
+ // Validate the 'method' parameter
+ if (!isValidMethod(method)) {
+ return handleInvalidMethod(method);
+ }
+
+ // Validate the 'req' parameter
+ if (!isValidJson(req)) {
+ return handleInvalidJson(req);
+ }
+
+ // Perform processing based on the 'method' parameter
switch (method) {
case TubeConst.ADD:
- return masterService.baseRequestMaster(gson.fromJson(req,
BatchAddTopicReq.class));
+ return handleAddTopicRequest(req);
case TubeConst.CLONE:
- return nodeService.cloneTopicToBrokers(gson.fromJson(req,
CloneTopicReq.class));
+ return handleCloneTopicRequest(req);
case TubeConst.AUTH_CONTROL:
- return setAuthControl(gson.fromJson(req,
SetAuthControlReq.class));
+ return handleAuthControlRequest(req);
case TubeConst.MODIFY:
- return masterService.baseRequestMaster(gson.fromJson(req,
ModifyTopicReq.class));
+ return handleModifyTopicRequest(req);
case TubeConst.DELETE:
case TubeConst.REMOVE:
- return masterService.baseRequestMaster(gson.fromJson(req,
DeleteTopicReq.class));
+ return handleDeleteTopicRequest(req);
case TubeConst.QUERY_CAN_WRITE:
- return queryCanWrite(gson.fromJson(req,
QueryCanWriteReq.class));
+ return handleQueryCanWriteRequest(req);
case TubeConst.PUBLISH:
- return masterService.baseRequestMaster(gson.fromJson(req,
SetPublishReq.class));
+ return handlePublishRequest(req);
case TubeConst.SUBSCRIBE:
- return masterService.baseRequestMaster(gson.fromJson(req,
SetSubscribeReq.class));
+ return handleSubscribeRequest(req);
default:
- return
TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_METHOD);
+ return handleInvalidMethod(method);
}
}
- private TubeMQResult setAuthControl(SetAuthControlReq req) {
- req.setMethod(TubeConst.SET_AUTH_CONTROL);
- req.setType(TubeConst.OP_MODIFY);
- req.setCreateUser(TubeConst.TUBEADMIN);
- return masterService.baseRequestMaster(req);
+ /**
+ * Handles invalid 'method' parameter.
+ *
+ * @param method The invalid method value
+ * @return TubeMQResult indicating an error
+ */
+ private TubeMQResult handleInvalidMethod(String method) {
+ LOGGER.warn("Invalid method value received: {}", method);
+ return TubeMQResult.errorResult("Invalid method value.");
+ }
+
+ /**
+ * Handles invalid JSON format in 'req' parameter.
+ *
+ * @param req The invalid JSON format
+ * @return TubeMQResult indicating an error
+ */
+ private TubeMQResult handleInvalidJson(String req) {
+ LOGGER.warn("Invalid JSON format received: {}", req);
+ return TubeMQResult.errorResult("Invalid JSON format.");
+ }
+
+ /**
+ * Handles 'ADD' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleAddTopicRequest(String req) {
+ return masterService.baseRequestMaster(gson.fromJson(req,
BatchAddTopicReq.class));
+ }
+
+ /**
+ * Handles 'CLONE' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleCloneTopicRequest(String req) throws Exception {
+ return nodeService.cloneTopicToBrokers(gson.fromJson(req,
CloneTopicReq.class));
}
- private TubeMQResult queryCanWrite(QueryCanWriteReq req) {
- if (!req.legal()) {
+ /**
+ * Handles 'AUTH_CONTROL' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleAuthControlRequest(String req) {
+ SetAuthControlReq setAuthControlReq = gson.fromJson(req,
SetAuthControlReq.class);
+ setAuthControlReq.setMethod(TubeConst.SET_AUTH_CONTROL);
+ setAuthControlReq.setType(TubeConst.OP_MODIFY);
+ setAuthControlReq.setCreateUser(TubeConst.TUBEADMIN);
+ return masterService.baseRequestMaster(setAuthControlReq);
+ }
+
+ /**
+ * Handles 'MODIFY' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleModifyTopicRequest(String req) {
+ return masterService.baseRequestMaster(gson.fromJson(req,
ModifyTopicReq.class));
+ }
+
+ /**
+ * Handles 'DELETE' and 'REMOVE' method requests.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleDeleteTopicRequest(String req) {
+ return masterService.baseRequestMaster(gson.fromJson(req,
DeleteTopicReq.class));
+ }
+
+ /**
+ * Handles 'QUERY_CAN_WRITE' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleQueryCanWriteRequest(String req) {
+ QueryCanWriteReq queryCanWriteReq = gson.fromJson(req,
QueryCanWriteReq.class);
+ if (!queryCanWriteReq.legal()) {
return TubeMQResult.errorResult(TubeMQErrorConst.PARAM_ILLEGAL);
}
- return topicService.queryCanWrite(req.getTopicName(),
req.getClusterId());
+ return topicService.queryCanWrite(queryCanWriteReq.getTopicName(),
queryCanWriteReq.getClusterId());
+ }
+
+ /**
+ * Handles 'PUBLISH' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handlePublishRequest(String req) {
+ return masterService.baseRequestMaster(gson.fromJson(req,
SetPublishReq.class));
+ }
+
+ /**
+ * Handles 'SUBSCRIBE' method request.
+ *
+ * @param req The JSON request
+ * @return TubeMQResult based on the request
+ */
+ private TubeMQResult handleSubscribeRequest(String req) {
+ return masterService.baseRequestMaster(gson.fromJson(req,
SetSubscribeReq.class));
+ }
+
+ /**
+ * Checks if the given method is valid by comparing it against a list of
allowed methods.
+ *
+ * @param method The method to validate.
+ * @return {@code true} if the method is valid, {@code false} otherwise.
+ */
+ private static boolean isValidMethod(String method) {
+ // Define a list of allowed methods
+ List<String> allowedMethods = Arrays.asList(
+ TubeConst.ADD, TubeConst.CLONE, TubeConst.AUTH_CONTROL,
TubeConst.MODIFY,
+ TubeConst.DELETE, TubeConst.REMOVE, TubeConst.QUERY_CAN_WRITE,
TubeConst.PUBLISH, TubeConst.SUBSCRIBE);
+ return allowedMethods.contains(method);
+ }
+
+ /**
+ * Validates whether the given JSON string has a valid format by
attempting to parse it.
+ *
+ * @param json The JSON string to validate.
+ * @return {@code true} if the JSON format is valid, {@code false}
otherwise.
+ */
+ private static boolean isValidJson(String json) {
+ // Use a JSON library or parser to validate the JSON format
+ try {
+ gson.fromJson(json, Object.class);
+ return true;
+ } catch (JsonSyntaxException e) {
+ LOGGER.error("JSON validation failed with exception: {}",
e.getMessage());
+ return false;
+ }
}
/**
diff --git
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
index f16ec6459e..d873c2e9cd 100644
---
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
+++
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/MasterServiceImpl.java
@@ -37,6 +37,10 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@@ -54,11 +58,112 @@ public class MasterServiceImpl implements MasterService {
@Autowired
MasterRepository masterRepository;
+ /**
+ * Request the master using the given URL.
+ *
+ * @param url The URL to request.
+ * @return TubeMQResult representing the result of the request.
+ */
@Override
public TubeMQResult requestMaster(String url) {
-
log.info("start to request {}", url);
+ long startTime = System.currentTimeMillis();
+
+ if (!isValidURL(url)) {
+ log.error("Invalid URL: {}", url);
+ logRequestDetails(url, startTime, "Invalid URL");
+ return TubeMQResult.errorResult("Invalid URL.");
+ }
+
+ String hostname = getHostnameFromURL(url);
+
+ if (!isValidHostname(hostname)) {
+ log.error("Invalid hostname: {}", hostname);
+ logRequestDetails(url, startTime, "Invalid hostname");
+ return TubeMQResult.errorResult("Invalid hostname.");
+ }
+
HttpGet httpGet = new HttpGet(url);
+ return executeHttpRequest(httpGet, url, startTime);
+ }
+
+ /**
+ * Logs request details including URL, status, and duration.
+ *
+ * @param url The URL being requested.
+ * @param startTime The start time of the request.
+ * @param status The status of the request.
+ */
+ private void logRequestDetails(String url, long startTime, String status) {
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+ log.info("Request Details - URL: {}, Status: {}, Duration: {} ms",
url, status, duration);
+ }
+
+ /**
+ * Extracts the hostname from the given URL.
+ *
+ * @param url The URL from which to extract the hostname.
+ * @return The extracted hostname.
+ */
+ private String getHostnameFromURL(String url) {
+ try {
+ URL u = new URL(url);
+ return u.getHost();
+ } catch (MalformedURLException e) {
+ log.warn("Failed to extract hostname from URL: {}",
e.getMessage());
+ return null;
+ }
+ }
+
+ /**
+ * Validates the hostname by performing DNS resolution.
+ *
+ * @param hostname The hostname to validate.
+ * @return true if the hostname is valid, false otherwise.
+ */
+ private boolean isValidHostname(String hostname) {
+ if (hostname == null) {
+ return false;
+ }
+ try {
+ InetAddress.getByName(hostname);
+ return true;
+ } catch (UnknownHostException e) {
+ log.error("DNS resolution failed for hostname: {}", hostname, e);
+ return false;
+ }
+ }
+
+ /**
+ * Validates the format of the URL and checks if it starts with "http" or
"https".
+ *
+ * @param url The URL to validate.
+ * @return true if the URL is valid, false otherwise.
+ */
+ private boolean isValidURL(String url) {
+ try {
+ URL u = new URL(url);
+ String protocol = u.getProtocol().toLowerCase();
+ if ("http".equals(protocol) || "https".equals(protocol)) {
+ return true;
+ }
+ } catch (MalformedURLException e) {
+ log.warn("URL validation failed with exception: {}",
e.getMessage());
+ return false;
+ }
+ return false;
+ }
+
+ /**
+ * Executes an HTTP request and returns the TubeMQResult.
+ *
+ * @param httpGet The HttpGet request to execute.
+ * @param url The URL being requested.
+ * @param startTime The start time of the request.
+ * @return TubeMQResult representing the result of the HTTP request.
+ */
+ private TubeMQResult executeHttpRequest(HttpGet httpGet, String url, long
startTime) {
TubeMQResult defaultResult = new TubeMQResult();
try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
@@ -67,13 +172,16 @@ public class MasterServiceImpl implements MasterService {
StandardCharsets.UTF_8), TubeHttpResponse.class);
if (tubeResponse.getCode() == TubeConst.SUCCESS_CODE
&& tubeResponse.getErrCode() == TubeConst.SUCCESS_CODE) {
+ logRequestDetails(url, startTime, "Success");
return defaultResult;
} else {
defaultResult = errorResult(tubeResponse.getErrMsg());
+ logRequestDetails(url, startTime, "Failed: " +
tubeResponse.getErrMsg());
}
} catch (Exception ex) {
log.error("exception caught while requesting broker status", ex);
defaultResult = TubeMQResult.errorResult(ex.getMessage());
+ logRequestDetails(url, startTime, "Exception: " + ex.getMessage());
}
return defaultResult;
}
@@ -87,9 +195,22 @@ public class MasterServiceImpl implements MasterService {
@Override
public String queryMaster(String url) {
log.info("start to request {}", url);
+
+ if (!isValidURL(url)) {
+ log.error("Invalid URL: {}", url);
+ return gson.toJson(TubeMQResult.errorResult("Invalid URL."));
+ }
+
HttpGet httpGet = new HttpGet(url);
TubeMQResult defaultResult = new TubeMQResult();
try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+ // If the redirected URL is different from the original URL,
perform further validation
+ String redirectedUrl =
response.getHeaders("Location")[0].getValue();
+ if (!url.equals(redirectedUrl) && !isValidURL(redirectedUrl)) {
+ log.error("Invalid redirected URL: {}", redirectedUrl);
+ return gson.toJson(TubeMQResult.errorResult("Invalid
redirected URL."));
+ }
+
// return result json to response
return EntityUtils.toString(response.getEntity());
} catch (Exception ex) {
@@ -97,8 +218,8 @@ public class MasterServiceImpl implements MasterService {
defaultResult.setErrCode(-1);
defaultResult.setResult(false);
defaultResult.setErrMsg(ex.getMessage());
+ return gson.toJson(defaultResult);
}
- return gson.toJson(defaultResult);
}
@Override
diff --git
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
index 3bfa6a1bb1..38a3dee833 100644
---
a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
+++
b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicServiceImpl.java
@@ -55,9 +55,13 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* node service to query broker/master/standby status of tube cluster.
@@ -70,6 +74,11 @@ public class TopicServiceImpl implements TopicService {
public static final int MINIMUN_TOPIC_RUN_PART = 1;
private final CloseableHttpClient httpclient = HttpClients.createDefault();
private final Gson gson = new Gson();
+ private static final Pattern SPECIAL_CHAR_PATTERN =
Pattern.compile("[%\\x00-\\x1F\\x7F-\\uFFFF]");
+ private static final int MAX_TOPIC_NAME_LENGTH = 255;
+ private static final String[] DANGEROUS_KEYWORDS = {
+ "exec", "system", "cmd", "shell", "php", "perl", "python", "ruby",
"javascript", "java"
+ };
@Value("${manager.broker.webPort:8081}")
private int brokerWebPort;
@@ -111,25 +120,140 @@ public class TopicServiceImpl implements TopicService {
return TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_GROUP);
}
+ /**
+ * Requests and retrieves topic view information from a TubeMQ cluster.
+ *
+ * @param clusterId The ID of the TubeMQ cluster.
+ * @param topicName The name of the topic to retrieve information for.
+ * @return The TopicView object containing topic information.
+ * @throws IllegalArgumentException If any validation checks fail.
+ * @throws RuntimeException If an exception occurs during the
request.
+ */
@Override
public TopicView requestTopicViewInfo(Long clusterId, String topicName) {
MasterEntry masterNode = masterService.getMasterNode(clusterId);
- String url = TubeConst.SCHEMA + masterNode.getIp() + ":" +
masterNode.getWebPort()
- + TubeConst.TOPIC_VIEW;
- if (StringUtils.isNotBlank(topicName)) {
- url = StringUtils.join(url, TubeConst.TOPIC_NAME, topicName);
- }
+ validateMasterEntry(masterNode, clusterId);
+ validateTopicName(topicName, clusterId);
+
+ String url = buildTopicViewURL(masterNode, topicName, clusterId);
HttpGet httpget = new HttpGet(url);
try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- return gson.fromJson(new
InputStreamReader(response.getEntity().getContent(),
- StandardCharsets.UTF_8),
- TopicView.class);
+ return parseTopicViewResponse(response);
} catch (Exception ex) {
- log.error("exception caught while requesting group status", ex);
+ handleRequestException(clusterId, topicName, ex);
throw new RuntimeException(ex.getMessage());
}
}
+ /**
+ * Validates if the provided MasterEntry is valid.
+ *
+ * @param masterNode The MasterEntry to validate.
+ * @param clusterId The ID of the TubeMQ cluster.
+ * @throws IllegalArgumentException If the MasterEntry is invalid.
+ */
+ private void validateMasterEntry(MasterEntry masterNode, Long clusterId) {
+ if (masterNode == null || StringUtils.isBlank(masterNode.getIp()) ||
masterNode.getWebPort() <= 0) {
+ log.error("Invalid MasterEntry: ClusterId = {}", clusterId);
+ throw new IllegalArgumentException("Invalid MasterEntry.");
+ }
+ }
+
+ /**
+ * Validates if the provided topic name is valid.
+ *
+ * @param topicName The topic name to validate.
+ * @param clusterId The ID of the TubeMQ cluster.
+ * @throws IllegalArgumentException If the topic name is invalid.
+ */
+ private void validateTopicName(String topicName, Long clusterId) {
+ if (StringUtils.isBlank(topicName) || containsDangerousChars(topicName)
+ || topicName.length() > MAX_TOPIC_NAME_LENGTH) {
+ log.error("Invalid topicName: ClusterId = {}, TopicName = {}",
clusterId, topicName);
+ throw new IllegalArgumentException("Invalid topicName.");
+ }
+ }
+
+ /**
+ * Builds the URL for requesting topic view information.
+ *
+ * @param masterNode The MasterEntry representing the TubeMQ master node.
+ * @param topicName The name of the topic.
+ * @param clusterId The ID of the TubeMQ cluster.
+ * @return The constructed URL.
+ * @throws IllegalArgumentException If the URL is invalid.
+ */
+ private String buildTopicViewURL(MasterEntry masterNode, String topicName,
Long clusterId) {
+ String url = TubeConst.SCHEMA + masterNode.getIp() + ":" +
masterNode.getWebPort() + TubeConst.TOPIC_VIEW;
+ if (!isValidURL(url)) {
+ log.error("Invalid URL: ClusterId = {}, URL = {}", clusterId, url);
+ throw new IllegalArgumentException("Invalid URL.");
+ }
+ return url + TubeConst.TOPIC_NAME + topicName;
+ }
+
+ /**
+ * Parses the response to obtain a TopicView object.
+ *
+ * @param response The HTTP response containing topic view information.
+ * @return The parsed TopicView object.
+ * @throws Exception If an exception occurs during parsing.
+ */
+ private TopicView parseTopicViewResponse(CloseableHttpResponse response)
throws Exception {
+ return gson.fromJson(new
InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8),
+ TopicView.class);
+ }
+
+ /**
+ * Handles exceptions that occur during the request.
+ *
+ * @param clusterId The ID of the TubeMQ cluster.
+ * @param topicName The name of the topic.
+ * @param ex The exception that occurred.
+ */
+ private void handleRequestException(Long clusterId, String topicName,
Exception ex) {
+ log.error("Exception caught while requesting topic view: ClusterId =
{}, TopicName = {}", clusterId, topicName,
+ ex);
+ }
+
+ /**
+ * Checks if the input string contains dangerous characters or keywords
that may pose security risks,
+ * such as those commonly associated with SSRF attacks.
+ *
+ * @param input The input string to be checked for dangerous characters or
keywords.
+ * @return True if the input contains dangerous characters or keywords,
otherwise false.
+ */
+ private boolean containsDangerousChars(String input) {
+ input = input.toLowerCase();
+ // Prevent SSRF attacks by checking for "://"
+ if (input.contains("://")) {
+ return true;
+ }
+ // Check for other possible dangerous characters or keywords
+ if (StringUtils.containsAny(input, DANGEROUS_KEYWORDS)) {
+ return true;
+ }
+ // Check for encoding of special characters or escape characters
+ Matcher matcher = SPECIAL_CHAR_PATTERN.matcher(input);
+ return matcher.find();
+ }
+
+ /**
+ * Validates if the provided URL string is in a valid URL format.
+ *
+ * @param url The URL string to be validated.
+ * @return True if the URL is in a valid format, otherwise false.
+ */
+ private boolean isValidURL(String url) {
+ try {
+ new URL(url);
+ return true;
+ } catch (MalformedURLException e) {
+ log.warn("URL validation failed with exception: {}",
e.getMessage());
+ return false;
+ }
+ }
+
@Override
public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
MasterEntry master =
masterService.getMasterNode(Long.valueOf(req.getClusterId()));