This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d4fc2b19c [INLONG-7379][Manager] Add a database name for MySQLSink 
(#7380)
d4fc2b19c is described below

commit d4fc2b19cc6a779c06dfecb7d81f727d5c054db2
Author: fuweng11 <[email protected]>
AuthorDate: Thu Feb 16 16:15:30 2023 +0800

    [INLONG-7379][Manager] Add a database name for MySQLSink (#7380)
---
 .../manager/pojo/node/mysql/MySQLDataNodeDTO.java  | 13 ++++++++
 .../inlong/manager/pojo/sink/mysql/MySQLSink.java  |  5 ++-
 .../manager/pojo/sink/mysql/MySQLSinkDTO.java      | 39 ++++++++++++++++++++--
 .../manager/pojo/sink/mysql/MySQLSinkRequest.java  |  5 ++-
 .../manager/pojo/sort/util/LoadNodeUtils.java      |  7 ++--
 .../manager/pojo/sink/mysql/MySQLSinkDTOTest.java  | 14 ++++++++
 .../service/node/mysql/MySQLDataNodeOperator.java  |  2 +-
 .../resource/sink/mysql/MySQLResourceOperator.java |  3 +-
 .../service/sink/mysql/MySQLSinkOperator.java      |  9 ++---
 9 files changed, 83 insertions(+), 14 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
index 633c7a67b..d4e0b6f50 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
@@ -23,6 +23,7 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -42,6 +43,7 @@ import javax.validation.constraints.NotNull;
 public class MySQLDataNodeDTO {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MySQLDataNodeDTO.class);
+    private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
 
     @ApiModelProperty("URL of backup DB server")
     private String backupUrl;
@@ -66,4 +68,15 @@ public class MySQLDataNodeDTO {
                     String.format("Failed to parse extParams for MySQL node: 
%s", e.getMessage()));
         }
     }
+
+    /**
+     * Convert ip:post to jdbcurl.
+     */
+    public static String convertToJdbcurl(String url) {
+        String jdbcUrl = url;
+        if (StringUtils.isNotBlank(jdbcUrl) && 
!jdbcUrl.startsWith(MYSQL_JDBC_PREFIX)) {
+            jdbcUrl = MYSQL_JDBC_PREFIX + jdbcUrl;
+        }
+        return jdbcUrl;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
index 6a5c148fc..7ce6a06d5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSink.java
@@ -42,7 +42,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = SinkType.MYSQL)
 public class MySQLSink extends StreamSink {
 
-    @ApiModelProperty("MySQL JDBC URL, such as 
jdbc:mysql://host:port/database")
+    @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port")
     private String jdbcUrl;
 
     @ApiModelProperty("Username for JDBC URL")
@@ -51,6 +51,9 @@ public class MySQLSink extends StreamSink {
     @ApiModelProperty("User password")
     private String password;
 
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
     @ApiModelProperty("Target table name")
     private String tableName;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index 7cc95f14f..77b4128ec 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -37,6 +37,8 @@ import java.net.URLDecoder;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * MySQL sink info
@@ -61,8 +63,9 @@ public class MySQLSinkDTO {
     };
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MySQLSinkDTO.class);
+    private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
 
-    @ApiModelProperty("MySQL JDBC URL, such as 
jdbc:mysql://host:port/database")
+    @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port")
     private String jdbcUrl;
 
     @ApiModelProperty("Username for JDBC URL")
@@ -71,6 +74,9 @@ public class MySQLSinkDTO {
     @ApiModelProperty("User password")
     private String password;
 
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
     @ApiModelProperty("Target table name")
     private String tableName;
 
@@ -94,6 +100,7 @@ public class MySQLSinkDTO {
                 .username(request.getUsername())
                 .password(request.getPassword())
                 .primaryKey(request.getPrimaryKey())
+                .databaseName(request.getDatabaseName())
                 .tableName(request.getTableName())
                 .properties(request.getProperties())
                 .build();
@@ -123,8 +130,7 @@ public class MySQLSinkDTO {
      */
     public static MySQLTableInfo getTableInfo(MySQLSinkDTO mySQLSink, 
List<MySQLColumnInfo> columnList) {
         MySQLTableInfo tableInfo = new MySQLTableInfo();
-        String dbName = getDbNameFromUrl(mySQLSink.getJdbcUrl());
-        tableInfo.setDbName(dbName);
+        tableInfo.setDbName(mySQLSink.getDatabaseName());
         tableInfo.setTableName(mySQLSink.getTableName());
         tableInfo.setPrimaryKey(mySQLSink.getPrimaryKey());
         tableInfo.setColumns(columnList);
@@ -178,6 +184,33 @@ public class MySQLSinkDTO {
         return database;
     }
 
+    public static String setDbNameToUrl(String jdbcUrl, String databaseName) {
+        if (StringUtils.isBlank(jdbcUrl)) {
+            return jdbcUrl;
+        }
+        String pattern = 
"jdbc:mysql://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)";
+        Pattern namePattern = Pattern.compile(pattern);
+        Matcher dateMatcher = namePattern.matcher(jdbcUrl);
+        StringBuilder resultUrl;
+        if (dateMatcher.find()) {
+            String host = dateMatcher.group("host");
+            String port = dateMatcher.group("port");
+            resultUrl = new StringBuilder().append(MYSQL_JDBC_PREFIX)
+                    .append(host)
+                    .append(InlongConstants.COLON)
+                    .append(port)
+                    .append(InlongConstants.SLASH)
+                    .append(databaseName);
+        } else {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    "MySQL JDBC URL was invalid, it should like 
jdbc:mysql://host:port");
+        }
+        if (jdbcUrl.contains("?")) {
+            resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf("?")));
+        }
+        return resultUrl.toString();
+    }
+
     /**
      * Filter the sensitive params for the given URL.
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
index bf34914e6..54aebb945 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkRequest.java
@@ -36,7 +36,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = SinkType.MYSQL)
 public class MySQLSinkRequest extends SinkRequest {
 
-    @ApiModelProperty("MySQL JDBC URL, such as 
jdbc:mysql://host:port/database")
+    @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port")
     private String jdbcUrl;
 
     @ApiModelProperty("Username for JDBC URL")
@@ -45,6 +45,9 @@ public class MySQLSinkRequest extends SinkRequest {
     @ApiModelProperty("User password")
     private String password;
 
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
     @ApiModelProperty("Target table name")
     private String tableName;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 2c49597c6..05a63abc4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -39,6 +39,7 @@ import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
+import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
 import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
@@ -600,7 +601,7 @@ public class LoadNodeUtils {
                 null,
                 null,
                 properties,
-                mysqlSink.getJdbcUrl(),
+                MySQLSinkDTO.setDbNameToUrl(mysqlSink.getJdbcUrl(), 
mysqlSink.getDatabaseName()),
                 mysqlSink.getUsername(),
                 mysqlSink.getPassword(),
                 mysqlSink.getTableName(),
@@ -717,9 +718,9 @@ public class LoadNodeUtils {
     /**
      * Parse format
      *
-     * @param formatName        data serialization, support: csv, json, canal, 
avro, etc
+     * @param formatName data serialization, support: csv, json, canal, avro, 
etc
      * @param wrapWithInlongMsg whether wrap content with {@link 
InLongMsgFormat}
-     * @param separatorStr      the separator of data content
+     * @param separatorStr the separator of data content
      * @param ignoreParseErrors whether ignore deserialization error data
      * @return the format for serialized content
      */
diff --git 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
index 79f357daf..568f9e28a 100644
--- 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
+++ 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
@@ -74,4 +74,18 @@ public class MySQLSinkDTOTest {
                 originUrl);
     }
 
+    @Test
+    public void testSetDbNameToUrl() {
+        String originUrl = MySQLSinkDTO.setDbNameToUrl(
+                
"jdbc:mysql://127.0.0.1:3306?autoDeserialize=TRue&allowLoadLocalInfile=TRue&autoReconnect=true&allowUrlInLocalInfile=TRue&allowLoadLocalInfileInPath=/",
+                "test_db");
+        Assertions.assertEquals(
+                
"jdbc:mysql://127.0.0.1:3306/test_db?autoDeserialize=TRue&allowLoadLocalInfile=TRue&autoReconnect=true&allowUrlInLocalInfile=TRue&allowLoadLocalInfileInPath=/",
+                originUrl);
+        originUrl = MySQLSinkDTO.setDbNameToUrl("jdbc:mysql://127.0.0.1:3306", 
"test_db");
+        Assertions.assertEquals("jdbc:mysql://127.0.0.1:3306/test_db", 
originUrl);
+        originUrl = 
MySQLSinkDTO.setDbNameToUrl("jdbc:mysql://127.0.0.1:3306/", "test_db");
+        Assertions.assertEquals("jdbc:mysql://127.0.0.1:3306/test_db", 
originUrl);
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 8f3c04817..7bc1f8f4b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -90,7 +90,7 @@ public class MySQLDataNodeOperator extends 
AbstractDataNodeOperator {
 
     @Override
     public Boolean testConnection(DataNodeRequest request) {
-        String jdbcUrl = request.getUrl();
+        String jdbcUrl = MySQLDataNodeDTO.convertToJdbcurl(request.getUrl());
         String username = request.getUsername();
         String password = request.getToken();
         Preconditions.expectNotBlank(jdbcUrl, ErrorCodeEnum.INVALID_PARAMETER, 
"connection jdbcUrl cannot be empty");
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index 8469034ce..510891d7e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
@@ -132,7 +133,7 @@ public class MySQLResourceOperator implements 
SinkResourceOperator {
                     "mysql jdbc url not specified and data node is empty");
             DataNodeInfo dataNodeInfo = 
dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, mysqlInfo);
-            mysqlInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            
mysqlInfo.setJdbcUrl(MySQLDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
             mysqlInfo.setPassword(dataNodeInfo.getToken());
         }
         return mysqlInfo;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index 2d5a07eb9..1ddf7801b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -19,18 +19,19 @@ package org.apache.inlong.manager.service.sink.mysql;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +93,7 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
             DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(
                     entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
-            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            
dto.setJdbcUrl(MySQLDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
             dto.setPassword(dataNodeInfo.getToken());
         }
         CommonBeanUtils.copyProperties(entity, sink, true);

Reply via email to