This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d4fc2b19c [INLONG-7379][Manager] Add a database name for MySQLSink
(#7380)
d4fc2b19c is described below
commit d4fc2b19cc6a779c06dfecb7d81f727d5c054db2
Author: fuweng11 <[email protected]>
AuthorDate: Thu Feb 16 16:15:30 2023 +0800
[INLONG-7379][Manager] Add a database name for MySQLSink (#7380)
---
.../manager/pojo/node/mysql/MySQLDataNodeDTO.java | 13 ++++++++
.../inlong/manager/pojo/sink/mysql/MySQLSink.java | 5 ++-
.../manager/pojo/sink/mysql/MySQLSinkDTO.java | 39 ++++++++++++++++++++--
.../manager/pojo/sink/mysql/MySQLSinkRequest.java | 5 ++-
.../manager/pojo/sort/util/LoadNodeUtils.java | 7 ++--
.../manager/pojo/sink/mysql/MySQLSinkDTOTest.java | 14 ++++++++
.../service/node/mysql/MySQLDataNodeOperator.java | 2 +-
.../resource/sink/mysql/MySQLResourceOperator.java | 3 +-
.../service/sink/mysql/MySQLSinkOperator.java | 9 ++---
9 files changed, 83 insertions(+), 14 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
index 633c7a67b..d4e0b6f50 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
@@ -23,6 +23,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -42,6 +43,7 @@ import javax.validation.constraints.NotNull;
public class MySQLDataNodeDTO {
private static final Logger LOGGER =
LoggerFactory.getLogger(MySQLDataNodeDTO.class);
+ private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
@ApiModelProperty("URL of backup DB server")
private String backupUrl;
@@ -66,4 +68,15 @@ public class MySQLDataNodeDTO {
String.format("Failed to parse extParams for MySQL node:
%s", e.getMessage()));
}
}
+
+ /**
+ * Convert ip:post to jdbcurl.
+ */
+ public static String convertToJdbcurl(String url) {
+ String jdbcUrl = url;
+ if (StringUtils.isNotBlank(jdbcUrl) &&
!jdbcUrl.startsWith(MYSQL_JDBC_PREFIX)) {
+ jdbcUrl = MYSQL_JDBC_PREFIX + jdbcUrl;
+ }
+ return jdbcUrl;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
index 6a5c148fc..7ce6a06d5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
@@ -42,7 +42,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = SinkType.MYSQL)
public class MySQLSink extends StreamSink {
- @ApiModelProperty("MySQL JDBC URL, such as
jdbc:mysql://host:port/database")
+ @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port")
private String jdbcUrl;
@ApiModelProperty("Username for JDBC URL")
@@ -51,6 +51,9 @@ public class MySQLSink extends StreamSink {
@ApiModelProperty("User password")
private String password;
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
@ApiModelProperty("Target table name")
private String tableName;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index 7cc95f14f..77b4128ec 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -37,6 +37,8 @@ import java.net.URLDecoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* MySQL sink info
@@ -61,8 +63,9 @@ public class MySQLSinkDTO {
};
private static final Logger LOGGER =
LoggerFactory.getLogger(MySQLSinkDTO.class);
+ private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
- @ApiModelProperty("MySQL JDBC URL, such as
jdbc:mysql://host:port/database")
+ @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port")
private String jdbcUrl;
@ApiModelProperty("Username for JDBC URL")
@@ -71,6 +74,9 @@ public class MySQLSinkDTO {
@ApiModelProperty("User password")
private String password;
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
@ApiModelProperty("Target table name")
private String tableName;
@@ -94,6 +100,7 @@ public class MySQLSinkDTO {
.username(request.getUsername())
.password(request.getPassword())
.primaryKey(request.getPrimaryKey())
+ .databaseName(request.getDatabaseName())
.tableName(request.getTableName())
.properties(request.getProperties())
.build();
@@ -123,8 +130,7 @@ public class MySQLSinkDTO {
*/
public static MySQLTableInfo getTableInfo(MySQLSinkDTO mySQLSink,
List<MySQLColumnInfo> columnList) {
MySQLTableInfo tableInfo = new MySQLTableInfo();
- String dbName = getDbNameFromUrl(mySQLSink.getJdbcUrl());
- tableInfo.setDbName(dbName);
+ tableInfo.setDbName(mySQLSink.getDatabaseName());
tableInfo.setTableName(mySQLSink.getTableName());
tableInfo.setPrimaryKey(mySQLSink.getPrimaryKey());
tableInfo.setColumns(columnList);
@@ -178,6 +184,33 @@ public class MySQLSinkDTO {
return database;
}
+ public static String setDbNameToUrl(String jdbcUrl, String databaseName) {
+ if (StringUtils.isBlank(jdbcUrl)) {
+ return jdbcUrl;
+ }
+ String pattern =
"jdbc:mysql://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)";
+ Pattern namePattern = Pattern.compile(pattern);
+ Matcher dateMatcher = namePattern.matcher(jdbcUrl);
+ StringBuilder resultUrl;
+ if (dateMatcher.find()) {
+ String host = dateMatcher.group("host");
+ String port = dateMatcher.group("port");
+ resultUrl = new StringBuilder().append(MYSQL_JDBC_PREFIX)
+ .append(host)
+ .append(InlongConstants.COLON)
+ .append(port)
+ .append(InlongConstants.SLASH)
+ .append(databaseName);
+ } else {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "MySQL JDBC URL was invalid, it should like
jdbc:mysql://host:port");
+ }
+ if (jdbcUrl.contains("?")) {
+ resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf("?")));
+ }
+ return resultUrl.toString();
+ }
+
/**
* Filter the sensitive params for the given URL.
*
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
index bf34914e6..54aebb945 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
@@ -36,7 +36,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = SinkType.MYSQL)
public class MySQLSinkRequest extends SinkRequest {
- @ApiModelProperty("MySQL JDBC URL, such as
jdbc:mysql://host:port/database")
+ @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port")
private String jdbcUrl;
@ApiModelProperty("Username for JDBC URL")
@@ -45,6 +45,9 @@ public class MySQLSinkRequest extends SinkRequest {
@ApiModelProperty("User password")
private String password;
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
@ApiModelProperty("Target table name")
private String tableName;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 2c49597c6..05a63abc4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -39,6 +39,7 @@ import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
+import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
@@ -600,7 +601,7 @@ public class LoadNodeUtils {
null,
null,
properties,
- mysqlSink.getJdbcUrl(),
+ MySQLSinkDTO.setDbNameToUrl(mysqlSink.getJdbcUrl(),
mysqlSink.getDatabaseName()),
mysqlSink.getUsername(),
mysqlSink.getPassword(),
mysqlSink.getTableName(),
@@ -717,9 +718,9 @@ public class LoadNodeUtils {
/**
* Parse format
*
- * @param formatName data serialization, support: csv, json, canal,
avro, etc
+ * @param formatName data serialization, support: csv, json, canal, avro,
etc
* @param wrapWithInlongMsg whether wrap content with {@link
InLongMsgFormat}
- * @param separatorStr the separator of data content
+ * @param separatorStr the separator of data content
* @param ignoreParseErrors whether ignore deserialization error data
* @return the format for serialized content
*/
diff --git
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
index 79f357daf..568f9e28a 100644
---
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
+++
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
@@ -74,4 +74,18 @@ public class MySQLSinkDTOTest {
originUrl);
}
+ @Test
+ public void testSetDbNameToUrl() {
+ String originUrl = MySQLSinkDTO.setDbNameToUrl(
+
"jdbc:mysql://127.0.0.1:3306?autoDeserialize=TRue&allowLoadLocalInfile=TRue&autoReconnect=true&allowUrlInLocalInfile=TRue&allowLoadLocalInfileInPath=/",
+ "test_db");
+ Assertions.assertEquals(
+
"jdbc:mysql://127.0.0.1:3306/test_db?autoDeserialize=TRue&allowLoadLocalInfile=TRue&autoReconnect=true&allowUrlInLocalInfile=TRue&allowLoadLocalInfileInPath=/",
+ originUrl);
+ originUrl = MySQLSinkDTO.setDbNameToUrl("jdbc:mysql://127.0.0.1:3306",
"test_db");
+ Assertions.assertEquals("jdbc:mysql://127.0.0.1:3306/test_db",
originUrl);
+ originUrl =
MySQLSinkDTO.setDbNameToUrl("jdbc:mysql://127.0.0.1:3306/", "test_db");
+ Assertions.assertEquals("jdbc:mysql://127.0.0.1:3306/test_db",
originUrl);
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 8f3c04817..7bc1f8f4b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -90,7 +90,7 @@ public class MySQLDataNodeOperator extends
AbstractDataNodeOperator {
@Override
public Boolean testConnection(DataNodeRequest request) {
- String jdbcUrl = request.getUrl();
+ String jdbcUrl = MySQLDataNodeDTO.convertToJdbcurl(request.getUrl());
String username = request.getUsername();
String password = request.getToken();
Preconditions.expectNotBlank(jdbcUrl, ErrorCodeEnum.INVALID_PARAMETER,
"connection jdbcUrl cannot be empty");
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index 8469034ce..510891d7e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
@@ -132,7 +133,7 @@ public class MySQLResourceOperator implements
SinkResourceOperator {
"mysql jdbc url not specified and data node is empty");
DataNodeInfo dataNodeInfo =
dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, mysqlInfo);
- mysqlInfo.setJdbcUrl(dataNodeInfo.getUrl());
+
mysqlInfo.setJdbcUrl(MySQLDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
mysqlInfo.setPassword(dataNodeInfo.getToken());
}
return mysqlInfo;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index 2d5a07eb9..1ddf7801b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -19,18 +19,19 @@ package org.apache.inlong.manager.service.sink.mysql;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), entity.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
- dto.setJdbcUrl(dataNodeInfo.getUrl());
+
dto.setJdbcUrl(MySQLDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
dto.setPassword(dataNodeInfo.getToken());
}
CommonBeanUtils.copyProperties(entity, sink, true);