mrliufox opened a new pull request, #14929:
URL: https://github.com/apache/dolphinscheduler/pull/14929
<!--Thanks very much for contributing to Apache DolphinScheduler, we are
happy that you want to help us improve DolphinScheduler! -->
## Purpose of the pull request
#Data source Added support for xugu database
## Brief change log
## 1. dolphinscheduler-dao module add init sql
resources/sql add dolphinscheduler_xugu.sql
## 2. add xugu jdbc driver and upgrade mybatis-plus
### 2.1 root pom.xml add xugu jdbc driver and upgrade mybatis-plus version
```xml
<xugu-jdbc.version>12.1.11</xugu-jdbc.version>
<mybatis-plus.version>3.3.0</mybatis-plus.version>
<dependency>
<groupId>com.xugu</groupId>
<artifactId>xugu-jdbc</artifactId>
<version>${xugu-jdbc.version}</version>
<scope>compile</scope>
</dependency>
```
### 2.2
dolphinscheduler-dao,dolphinscheduler-bom,dolphinscheduler-datasource-xugu
pom.xml add xugu jdbc driver
```xml
<dependency>
<groupId>com.xugu</groupId>
<artifactId>xugu-jdbc</artifactId>
</dependency>
```
### 2.3 dolphinscheduler-master,dolphinscheduler-worker
resources/application.yml add setting as fllows:
```yaml
# Override by profile
---
spring:
config:
activate:
on-profile: xugu
datasource:
driver-class-name: com.xugu.xugu-jdbc.Driver
url: jdbc:xugu://127.0.0.1:5138/dolphinscheduler
username: SYSDBA
password: SYSDBA
quartz:
properties:
org.quartz.jobStore.driverDelegateClass:
org.quartz.impl.jdbcjobstore.StdJDBCDelegate
```
### 2.4 dolphinscheduler-alert-server,dolphinscheduler-api
resources/application.yml add setting as fllows:
```yaml
# Override by profile
---
spring:
config:
activate:
on-profile: xugu
datasource:
driver-class-name: com.xugu.xugu-jdbc.Driver
url: jdbc:xugu://127.0.0.1:5138/dolphinscheduler
username: SYSDBA
password: SYSDBA
```
### 2.5 dolphinscheduler-standalone-server resources/application.yml add
setting as fllows:
```yaml
---
spring:
config:
activate:
on-profile: xugu
sql:
init:
schema-locations: classpath:sql/dolphinscheduler_xugu.sql
datasource:
driver-class-name: com.xugu.xugu-jdbc.Driver
url: jdbc:xugu://127.0.0.1:5138/dolphinscheduler
username: SYSDBA
password: SYSDBA
```
### 2.6 dolphinscheduler-tools resources/application.yml add setting as
fllows:
```yaml
---
spring:
config:
activate:
on-profile: xugu
datasource:
driver-class-name: com.xugu.xugu-jdbc.Driver
url: jdbc:xugu://127.0.0.1:5138/dolphinscheduler
```
## 3. Compatible source code modification
### 3.1 dolphinscheduler-dao module update CommandMapper.xml with
id=queryCommandPageBySlot
```xml
<select id="queryCommandPageBySlot"
resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where mod(id,#{masterCount}) = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit} offset #{offset}
</select>
```
### 3.2 dolphinscheduler-dao update ResourceMapper.xml with id=baseSqlV2
```xml
<sql id="baseSqlV2">
${alias}.id, ${alias}.alias, ${alias}.file_name, ${alias}.description,
${alias}.user_id, ${alias}.type, ${alias}."size", ${alias}.create_time,
${alias}.update_time,
${alias}.pid, ${alias}.full_name, ${alias}.is_directory
</sql>
```
## 4. Data source Added support for xugu database
### 4.1 dolphinscheduler-datasource-plugin
#### 4.1.1 dolphinscheduler-datasource-plugin add
dolphinscheduler-datasource-xugu module
##### 4.1.1.1 dolphinscheduler-datasource-xugu add param package,Contains
the following three classes
XuguConnectionParam、XuguDataSourceParamDTO、XuguDataSourceProcessor(Refer to
mysql)
org.apache.dolphinscheduler.plugin.datasource.xugu.XuguConnectionParam
org.apache.dolphinscheduler.plugin.datasource.xugu.XuguDataSourceParamDTO
org.apache.dolphinscheduler.plugin.datasource.xugu.XuguDataSourceProcessor
```
package org.apache.dolphinscheduler.plugin.datasource.xugu.param;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
public class XuguConnectionParam extends BaseConnectionParam {
@Override
public String toString() {
return "XuguConnectionParam{"
+ "user='" + user + '\''
+ ", password='" + password + '\''
+ ", address='" + address + '\''
+ ", database='" + database + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", driverLocation='" + driverLocation + '\''
+ ", driverClassName='" + driverClassName + '\''
+ ", validationQuery='" + validationQuery + '\''
+ ", other='" + other + '\''
+ '}';
}
}
```
```
package org.apache.dolphinscheduler.plugin.datasource.xugu.param;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.spi.enums.DbType;
public class XuguDataSourceParamDTO extends BaseDataSourceParamDTO {
@Override
public String toString() {
return "XuguDataSourceParamDTO{"
+ "name='" + name + '\''
+ ", note='" + note + '\''
+ ", host='" + host + '\''
+ ", port=" + port
+ ", database='" + database + '\''
+ ", userName='" + userName + '\''
+ ", password='" + password + '\''
+ ", other='" + other + '\''
+ '}';
}
@Override
public DbType getType() {
return DbType.XUGU;
}
}
```
```
package org.apache.dolphinscheduler.plugin.datasource.xugu.param;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class XuguDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson,
XuguDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String
connectionJson) {
XuguConnectionParam connectionParams = (XuguConnectionParam)
createConnectionParams(connectionJson);
XuguDataSourceParamDTO xuguDatasourceParamDTO = new
XuguDataSourceParamDTO();
xuguDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
xuguDatasourceParamDTO.setUserName(connectionParams.getUser());
xuguDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
String[] hostPortArray = hostSeperator[hostSeperator.length -
1].split(Constants.COMMA);
xuguDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
xuguDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
return xuguDatasourceParamDTO;
}
@Override
public BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO
datasourceParam) {
XuguDataSourceParamDTO xuguParam = (XuguDataSourceParamDTO)
datasourceParam;
String address;
String jdbcUrl;
address = String.format("%s%s:%s",
DataSourceConstants.JDBC_XUGU, xuguParam.getHost(),
xuguParam.getPort());
jdbcUrl = address + "/" + xuguParam.getDatabase();
XuguConnectionParam xuguConnectionParam = new XuguConnectionParam();
xuguConnectionParam.setUser(xuguParam.getUserName());
xuguConnectionParam.setPassword(PasswordUtils.encodePassword(xuguParam.getPassword()));
xuguConnectionParam.setAddress(address);
xuguConnectionParam.setJdbcUrl(jdbcUrl);
xuguConnectionParam.setDatabase(xuguParam.getDatabase());
xuguConnectionParam.setDriverClassName(getDatasourceDriver());
xuguConnectionParam.setValidationQuery(getValidationQuery());
xuguConnectionParam.setOther(transformOther(xuguParam.getOther()));
xuguConnectionParam.setProps(xuguParam.getOther());
return xuguConnectionParam;
}
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return JSONUtils.parseObject(connectionJson,
XuguConnectionParam.class);
}
@Override
public String getDatasourceDriver() {
return DataSourceConstants.COM_XUGU_JDBC_DRIVER;
}
@Override
public String getValidationQuery() {
return DataSourceConstants.XUGU_VALIDATION_QUERY;
}
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
XuguConnectionParam xuguConnectionParam = (XuguConnectionParam)
connectionParam;
if (!StringUtils.isEmpty(xuguConnectionParam.getOther())) {
return String.format("%s?%s", xuguConnectionParam.getJdbcUrl(),
xuguConnectionParam.getOther());
}
return xuguConnectionParam.getJdbcUrl();
}
@Override
public Connection getConnection(ConnectionParam connectionParam) throws
ClassNotFoundException, SQLException {
XuguConnectionParam xuguConnectionParam = (XuguConnectionParam)
connectionParam;
Class.forName(getDatasourceDriver());
return DriverManager.getConnection(getJdbcUrl(connectionParam),
xuguConnectionParam.getUser(),
PasswordUtils.decodePassword(xuguConnectionParam.getPassword()));
}
@Override
public DbType getDbType() {
return DbType.XUGU;
}
@Override
public DataSourceProcessor create() {
return new XuguDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
List<String> list = new ArrayList<>();
otherMap.forEach((key, value) -> list.add(String.format("%s=%s",
key, value)));
return String.join("&", list);
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}
```
##### 4.1.1.2 dolphinscheduler-datasource-plugin add
XuguDataSourceChannel、XuguDataSourceChannelFactory、XuguDataSourceClient(Refer
to mysql)
org.apache.dolphinscheduler.spi.datasource.DataSourceChannel
org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
```
package org.apache.dolphinscheduler.plugin.datasource.xugu;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;
public class XuguDataSourceChannel implements DataSourceChannel {
@Override
public DataSourceClient createDataSourceClient(BaseConnectionParam
baseConnectionParam, DbType dbType) {
return new XuguDataSourceClient(baseConnectionParam, dbType);
}
}
```
```
package org.apache.dolphinscheduler.plugin.datasource.xugu;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
import com.google.auto.service.AutoService;
@AutoService(DataSourceChannelFactory.class)
public class XuguDataSourceChannelFactory implements
DataSourceChannelFactory {
@Override
public String getName() {
return "xugu";
}
@Override
public DataSourceChannel create() {
return new XuguDataSourceChannel();
}
}
```
```
package org.apache.dolphinscheduler.plugin.datasource.xugu;
import
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
public class XuguDataSourceClient extends CommonDataSourceClient {
public XuguDataSourceClient(BaseConnectionParam baseConnectionParam,
DbType dbType) {
super(baseConnectionParam, dbType);
}
}
```
#### 4.1.3 dolphinscheduler-datasource-plugin/pom.xml
add module in dolphinscheduler-datasource-plugin/pom.xml
```
<module>dolphinscheduler-datasource-xugu</module>
```
#### 4.1.4 Constants.java
dolphinscheduler-commom/src/main/java/org/apache/dolphinscheduler/common.constants/DataSourceConstants.java
```
public static final String COM_XUGU_JDBC_DRIVER =
"com.xugu.xugu-jdbc.Driver";
```
```
public static final String XUGU_VALIDATION_QUERY = "select 1 from dual";
```
```
public static final String JDBC_XUGU = "jdbc:xugu://";
```
#### 4.1.5 DbType.java
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
```
XUGU(11,"xugu"),
```
#### 4.1.6 XuguSQLUpgradeDao.java
Added the initialization of the table structure of the support library
hollow database
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/XuguSQLUpgradeDao.java
```
package org.apache.dolphinscheduler.tools.datasource.dao;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@Service
public class XuguSQLUpgradeDao extends UpgradeDao {
public static final Logger logger =
LoggerFactory.getLogger(XuguSQLUpgradeDao.class);
private XuguSQLUpgradeDao(DataSource dataSource) {
super(dataSource);
}
@Override
protected String initSqlPath() {
return "dolphinscheduler_xugu.sql";
}
@Override
public DbType getDbType() {
return DbType.XUGU;
}
public String getSchema() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet resultSet = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement("select current_schema()");
resultSet = pstmt.executeQuery();
while (resultSet.next()) {
if (resultSet.isFirst()) {
return resultSet.getString(1);
}
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(resultSet, pstmt, conn);
}
return "";
}
/**
* determines whether a table exists
*
* @param tableName tableName
* @return if table exist return true,else return false
*/
@Override
public boolean isExistsTable(String tableName) {
Connection conn = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getTables(conn.getCatalog(),
getSchema(), tableName, null);
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);
}
}
/**
* determines whether a field exists in the specified table
*
* @param tableName tableName
* @param columnName columnName
* @return if column name exist return true,else return false
*/
@Override
public boolean isExistsColumn(String tableName, String columnName) {
Connection conn = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getColumns(conn.getCatalog(),
getSchema(), tableName, columnName);
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);
}
}
}
```
## 5 Front-end modification
Add the XUGU class code to each of the following four ts files
dolphinscheduler-ui/src/service/modules/data-source/types.ts
```
type IDataBase =
| 'MYSQL'
| 'XUGU'
| 'POSTGRESQL'
| 'HIVE'
| 'SPARK'
```
dolphinscheduler-ui/src/views/datasource/list/use-form.ts
```
XUGU: {
value: 'XUGU',
label: 'XUGU',
defaultPort: 5138
},
```
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
```
{
id: 10,
code: 'XUGU',
disabled: false
}
```
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts
```
const supportedDatasourceType = [
'MYSQL',
'POSTGRESQL',
'ORACLE',
'SQLSERVER',
'CLICKHOUSE',
'DATABEND',
'HIVE',
'PRESTO',
'XUGU'
]
```
## Verify this pull request
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]