This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ae40b5c9f [INLONG-6938][Manager] Supports to create a starRocks
database or table (#6939)
ae40b5c9f is described below
commit ae40b5c9f2200c900b728764429fa7df1613735a
Author: fuweng11 <[email protected]>
AuthorDate: Mon Dec 19 16:29:41 2022 +0800
[INLONG-6938][Manager] Supports to create a starRocks database or table
(#6939)
---
.../pojo/sink/starrocks/StarRocksColumnInfo.java | 67 +++++++
.../manager/pojo/sink/starrocks/StarRocksSink.java | 9 +
.../pojo/sink/starrocks/StarRocksSinkDTO.java | 39 +++-
.../pojo/sink/starrocks/StarRocksSinkRequest.java | 10 +
.../pojo/sink/starrocks/StarRocksTableInfo.java | 46 +++++
.../sink/starrocks/StarRocksJdbcUtils.java | 217 +++++++++++++++++++++
.../sink/starrocks/StarRocksResourceOperator.java | 134 +++++++++++++
.../sink/starrocks/StarRocksSqlBuilder.java | 217 +++++++++++++++++++++
.../sink/starrocks/StarRocksSinkOperator.java | 73 ++++++-
9 files changed, 807 insertions(+), 5 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java
new file mode 100644
index 000000000..ba0a3731f
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.starrocks;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * StarRocks column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.STARROCKS)
+public class StarRocksColumnInfo extends SinkField {
+
+ private Boolean isSortKey = false;
+
+ private Boolean isDistributed = false;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static StarRocksColumnInfo getFromRequest(SinkField sinkField) {
+ return CommonBeanUtils.copyProperties(sinkField,
StarRocksColumnInfo::new, true);
+ }
+
+ /**
+ * Get the extra param from the Json
+ */
+ public static StarRocksColumnInfo getFromJson(@NotNull String extParams) {
+ if (StringUtils.isEmpty(extParams)) {
+ return new StarRocksColumnInfo();
+ }
+ try {
+ return JsonUtils.parseObject(extParams, StarRocksColumnInfo.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
index 64820cbf3..1afaf389f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
@@ -75,6 +75,15 @@ public class StarRocksSink extends StreamSink {
@ApiModelProperty("The multiple table-pattern of sink")
private String tablePattern;
+ @ApiModelProperty("The table engine, like: OLAP, MYSQL, ELASTICSEARCH,
etc, default is OLAP")
+ private String tableEngine;
+
+ @ApiModelProperty("The table replication num")
+ private Integer replicationNum;
+
+ @ApiModelProperty("The table barrel size")
+ private Integer barrelSize;
+
public StarRocksSink() {
this.setSinkType(SinkType.STARROCKS);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
index 36496bbb8..9b627c400 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
@@ -18,9 +18,6 @@
package org.apache.inlong.manager.pojo.sink.starrocks;
import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -31,6 +28,11 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
+import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
/**
* Sink info of StarRocks
*/
@@ -73,6 +75,15 @@ public class StarRocksSinkDTO {
@ApiModelProperty("The multiple table-pattern of sink")
private String tablePattern;
+ @ApiModelProperty("The table engine, like: OLAP, MYSQL, ELASTICSEARCH,
etc, default is OLAP")
+ private String tableEngine = "OLAP";
+
+ @ApiModelProperty("The table replication num")
+ private Integer replicationNum = 3;
+
+ @ApiModelProperty("The table barrel size")
+ private Integer barrelSize = 8;
+
@ApiModelProperty("Password encrypt version")
private Integer encryptVersion;
@@ -100,6 +111,9 @@ public class StarRocksSinkDTO {
.sinkMultipleFormat(request.getSinkMultipleFormat())
.databasePattern(request.getDatabasePattern())
.tablePattern(request.getTablePattern())
+ .tableEngine(request.getTableEngine())
+ .replicationNum(request.getReplicationNum())
+ .barrelSize(request.getBarrelSize())
.encryptVersion(encryptVersion)
.properties(request.getProperties())
.build();
@@ -113,6 +127,25 @@ public class StarRocksSinkDTO {
}
}
+ /**
+ * Get StarRocks table info
+ *
+ * @param sinkDTO StarRocks sink dto,{@link StarRocksSinkDTO}
+ * @param columnList StarRocks column info list,{@link StarRocksColumnInfo}
+ * @return {@link StarRocksTableInfo}
+ */
+ public static StarRocksTableInfo getTableInfo(StarRocksSinkDTO sinkDTO,
List<StarRocksColumnInfo> columnList) {
+ StarRocksTableInfo tableInfo = new StarRocksTableInfo();
+ tableInfo.setDbName(sinkDTO.getDatabaseName());
+ tableInfo.setTableName(sinkDTO.getTableName());
+ tableInfo.setColumns(columnList);
+ tableInfo.setPrimaryKey(sinkDTO.getPrimaryKey());
+ tableInfo.setTableEngine(sinkDTO.getTableEngine());
+ tableInfo.setReplicationNum(sinkDTO.getReplicationNum());
+ tableInfo.setBarrelSize(sinkDTO.getBarrelSize());
+ return tableInfo;
+ }
+
private StarRocksSinkDTO decryptPassword() throws Exception {
if (StringUtils.isNotEmpty(this.password)) {
byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
index 1956021ef..8c4bb74cb 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
@@ -68,4 +68,14 @@ public class StarRocksSinkRequest extends SinkRequest {
@ApiModelProperty("The multiple table-pattern of sink")
private String tablePattern;
+
+ @ApiModelProperty("The table engine, like: OLAP, MYSQL, ELASTICSEARCH,
etc, default is OLAP")
+ private String tableEngine = "OLAP";
+
+ @ApiModelProperty("The table replication num")
+ private Integer replicationNum = 3;
+
+ @ApiModelProperty("The table barrel size")
+ private Integer barrelSize = 8;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java
new file mode 100644
index 000000000..318eec070
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.starrocks;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * StarRocks table info.
+ */
+@Data
+public class StarRocksTableInfo {
+
+ private String dbName;
+
+ private String tableName;
+
+ private String comment;
+
+ private String primaryKey;
+
+ private String tableEngine;
+
+ private Integer replicationNum;
+
+ private Integer barrelSize;
+
+ private List<StarRocksColumnInfo> columns;
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
new file mode 100644
index 000000000..b69809a41
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.starrocks;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.jdbc.HiveDatabaseMetaData;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class StarRocksJdbcUtils {
+
+ private static final String STAR_ROCKS_DRIVER_CLASS =
"com.mysql.cj.jdbc.Driver";
+ private static final String METADATA_TYPE = "TABLE";
+ private static final String COLUMN_LABEL = "TABLE_NAME";
+ private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StarRocksJdbcUtils.class);
+
+ /**
+ * Get starRocks connection from starRocks url and user
+ */
+ public static Connection getConnection(String url, String user, String
password) throws Exception {
+ if (StringUtils.isBlank(url) ||
!url.startsWith(STAR_ROCKS_JDBC_PREFIX)) {
+ throw new Exception("starRocks server url should start with " +
STAR_ROCKS_JDBC_PREFIX);
+ }
+ Connection conn;
+ try {
+ Class.forName(STAR_ROCKS_DRIVER_CLASS);
+ conn = DriverManager.getConnection(url, user, password);
+ LOGGER.info("get star rocks connection success, url={}", url);
+ return conn;
+ } catch (Exception e) {
+ String errMsg = "get star rocks connection error, please check
starRocks jdbc url, username or password";
+ LOGGER.error(errMsg, e);
+ throw new Exception(errMsg + ", error: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Execute sql on the specified starRocks Server
+ *
+ * @param sql need to execute
+ * @param url url of starRocks server
+ * @param user user of starRocks server
+ * @param password password of starRocks server
+ * @throws Exception when executing error
+ */
+ public static void executeSql(String sql, String url, String user, String
password) throws Exception {
+ try (Connection conn = getConnection(url, user, password);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ LOGGER.info("execute sql [{}] success", sql);
+ }
+ }
+
+ /**
+ * Create StarRocks database
+ */
+ public static void createDb(String url, String username, String password,
String dbName) throws Exception {
+ String createDbSql = StarRocksSqlBuilder.buildCreateDbSql(dbName);
+ executeSql(createDbSql, url, username, password);
+ }
+
+ /**
+ * Create StarRocks table
+ */
+ public static void createTable(String url, String username, String
password, StarRocksTableInfo tableInfo)
+ throws Exception {
+ if (checkTablesExist(url, username, password, tableInfo.getDbName(),
tableInfo.getTableName())) {
+ LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
+ } else {
+ String createTableSql =
StarRocksSqlBuilder.buildCreateTableSql(tableInfo);
+ StarRocksJdbcUtils.executeSql(createTableSql, url, username,
password);
+ LOGGER.info("execute sql [{}] success", createTableSql);
+ }
+ }
+
+ /**
+ * Get Hive tables from the Hive metadata
+ */
+ public static List<String> getTables(String url, String user, String
password, String dbName) throws Exception {
+ try (Connection conn = getConnection(url, user, password)) {
+ HiveDatabaseMetaData metaData = (HiveDatabaseMetaData)
conn.getMetaData();
+ ResultSet rs = metaData.getTables(dbName, dbName, null, new
String[]{METADATA_TYPE});
+ List<String> tables = new ArrayList<>();
+ while (rs.next()) {
+ String tableName = rs.getString(COLUMN_LABEL);
+ tables.add(tableName);
+ }
+ rs.close();
+
+ return tables;
+ }
+ }
+
+ /**
+ * Add columns for StarRocks table
+ */
+ public static void addColumns(String url, String user, String password,
String dbName, String tableName,
+ List<StarRocksColumnInfo> columnList) throws Exception {
+ final List<StarRocksColumnInfo> columnInfos = Lists.newArrayList();
+
+ for (StarRocksColumnInfo columnInfo : columnList) {
+ if (!checkColumnExist(url, user, password, dbName, tableName,
columnInfo.getFieldName())) {
+ columnInfos.add(columnInfo);
+ }
+ }
+ List<String> addColumnSql =
StarRocksSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
+ try (Connection conn = getConnection(url, user, password)) {
+ executeSqlBatch(conn, addColumnSql);
+ }
+ }
+
+ /**
+ * Check tables from the StarRocks information_schema.
+ *
+ * @param url jdbcUrl
+ * @param user username
+ * @param password password
+ * @param dbName database name
+ * @param tableName table name
+ * @return true if table exist, otherwise false
+ * @throws Exception on check table exist error
+ */
+ public static boolean checkTablesExist(String url, String user, String
password, String dbName,
+ String tableName) throws Exception {
+ boolean result = false;
+ final String checkTableSql = StarRocksSqlBuilder.getCheckTable(dbName,
tableName);
+ try (Connection conn = getConnection(url, user, password);
+ Statement stmt = conn.createStatement()) {
+ ResultSet resultSet = stmt.executeQuery(checkTableSql);
+ if (Objects.nonNull(resultSet)) {
+ if (resultSet.next()) {
+ result = true;
+ }
+ }
+ }
+ LOGGER.info("check table exist for db={} table={}, result={}", dbName,
tableName, result);
+ return result;
+ }
+
+ /**
+ * Check whether the column exists in the StarRocks table.
+ *
+ * @param url jdbcUrl
+ * @param user username
+ * @param password password
+ * @param dbName database name
+ * @param tableName table name
+ * @param column table column name
+ * @return true if column exist in the table, otherwise false
+ * @throws Exception on check column exist error
+ */
+ public static boolean checkColumnExist(String url, String user, String
password, String dbName,
+ final String tableName, final String column) throws Exception {
+ boolean result = false;
+ final String checkTableSql =
StarRocksSqlBuilder.getCheckColumn(dbName, tableName, column);
+ try (Connection conn = getConnection(url, user, password);
+ Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery(checkTableSql)) {
+ if (Objects.nonNull(resultSet)) {
+ if (resultSet.next()) {
+ result = true;
+ }
+ }
+ }
+ LOGGER.info("check column exist for db={} table={}, result={}
column={}", dbName, tableName, result, column);
+ return result;
+ }
+
+ /**
+ * Execute batch query SQL on StarRocks.
+ *
+ * @param conn JDBC {@link Connection}
+ * @param sqls SQL to be executed
+ * @throws Exception on get execute SQL batch error
+ */
+ public static void executeSqlBatch(final Connection conn, final
List<String> sqls) throws Exception {
+ conn.setAutoCommit(false);
+ try (Statement stmt = conn.createStatement()) {
+ for (String entry : sqls) {
+ stmt.execute(entry);
+ }
+ conn.commit();
+ LOGGER.info("execute sql [{}] success", sqls);
+ } finally {
+ conn.setAutoCommit(true);
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
new file mode 100644
index 000000000..b657481b7
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.starrocks;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import
org.apache.inlong.manager.service.resource.sink.mysql.MySQLResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * StarRocks resource operator.
+ */
+@Service
+public class StarRocksResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySQLResourceOperator.class);
+
+ @Autowired
+ private StreamSinkService sinkService;
+
+ @Autowired
+ private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.STARROCKS.equals(sinkType);
+ }
+
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
+ if
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOG.warn("sink resource [" + sinkInfo.getId() + "] already
success, skip to create");
+ return;
+ } else if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
{
+ LOG.warn("create resource was disabled, skip to create for [" +
sinkInfo.getId() + "]");
+ return;
+ }
+ this.createTable(sinkInfo);
+ }
+
+ /**
+ * Create starRocks table by SinkInfo.
+ *
+ * @param sinkInfo {@link SinkInfo}
+ */
+ private void createTable(SinkInfo sinkInfo) {
+ LOG.info("begin to create starRocks table for sinkId={}",
sinkInfo.getId());
+ List<StreamSinkFieldEntity> fieldList =
fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+ if (CollectionUtils.isEmpty(fieldList)) {
+ LOG.warn("no starRocks fields found, skip to create table for
sinkId={}", sinkInfo.getId());
+ }
+ // get columns
+ List<StarRocksColumnInfo> columnList =
getStarRocksColumnInfoFromSink(fieldList);
+
+ StarRocksSinkDTO sinkDTO =
StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams());
+ StarRocksTableInfo tableInfo = StarRocksSinkDTO.getTableInfo(sinkDTO,
columnList);
+ String url = sinkDTO.getJdbcUrl();
+ String username = sinkDTO.getUsername();
+ String password = sinkDTO.getPassword();
+ String dbName = sinkDTO.getDatabaseName();
+ String tableName = sinkDTO.getTableName();
+ try {
+ // 1. create database if not exists
+ StarRocksJdbcUtils.createDb(url, username, password, dbName);
+ String dbUrl = url + "/" + dbName;
+ // 2. table not exists, create it
+ StarRocksJdbcUtils.createTable(dbUrl, username, password,
tableInfo);
+ // 3. table exists, add columns - skip the exists columns
+ StarRocksJdbcUtils.addColumns(dbUrl, username, password, dbName,
tableName, columnList);
+ // 4. update the sink status to success
+ String info = "success to create StarRocks resource";
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOG.info(info + " for sinkInfo={}", sinkInfo);
+ } catch (Throwable e) {
+ String errMsg = "create StarRocks table failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
+ }
+ LOG.info("success create StarRocks table for data sink [" +
sinkInfo.getId() + "]");
+ }
+
+ public List<StarRocksColumnInfo>
getStarRocksColumnInfoFromSink(List<StreamSinkFieldEntity> sinkList) {
+ List<StarRocksColumnInfo> columnInfoList = new ArrayList<>();
+ for (StreamSinkFieldEntity fieldEntity : sinkList) {
+ if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
+ StarRocksColumnInfo starRocksColumnInfo =
StarRocksColumnInfo.getFromJson(
+ fieldEntity.getExtParams());
+ CommonBeanUtils.copyProperties(fieldEntity,
starRocksColumnInfo, true);
+ columnInfoList.add(starRocksColumnInfo);
+ } else {
+ StarRocksColumnInfo starRocksColumnInfo = new
StarRocksColumnInfo();
+ CommonBeanUtils.copyProperties(fieldEntity,
starRocksColumnInfo, true);
+ columnInfoList.add(starRocksColumnInfo);
+ }
+ }
+ return columnInfoList;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java
new file mode 100644
index 000000000..ad5271be1
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.starrocks;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.apache.inlong.manager.service.resource.sink.hive.SqlBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Builder for SQL string
+ */
+public class StarRocksSqlBuilder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SqlBuilder.class);
+
+ /**
+ * Build create database SQL
+ */
+ public static String buildCreateDbSql(String dbName) {
+ // Support _ beginning with underscore
+ String sql = "CREATE DATABASE IF NOT EXISTS `" + dbName + "`";
+ LOGGER.info("create db sql: {}", sql);
+ return sql;
+ }
+
+ /**
+ * Build create table SQL
+ */
+ public static String buildCreateTableSql(StarRocksTableInfo table) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE ").append(table.getTableName());
+ // Construct columns and partition columns
+ sql.append(getColumnsAndComments(table));
+ if (!StringUtils.isEmpty(table.getPrimaryKey())) {
+ sql.append(", PRIMARY KEY (")
+ .append(table.getPrimaryKey())
+ .append(")");
+ }
+ if (!Objects.isNull(table.getReplicationNum())) {
+ sql.append(" PROPERTIES ( \"replication_num\" = \"")
+ .append(table.getReplicationNum())
+ .append("\")");
+ }
+
+ LOGGER.info("create table sql: {}", sql);
+ return sql.toString();
+ }
+
+ /**
+ * Build query table SQL
+ */
+ public static String buildDescTableSql(String dbName, String tableName) {
+ StringBuilder sql = new StringBuilder();
+ String fullTableName = "`" + dbName + "." + tableName + "`";
+ sql.append("DESC ").append(fullTableName);
+
+ LOGGER.info("desc table sql={}", sql);
+ return sql.toString();
+ }
+
+ /**
+ * Build add columns SQL.
+ *
+ * @param tableName StarRocks table name
+ * @param columnList StarRocks column list {@link List}
+ * @return add column SQL string list
+ */
+ public static List<String> buildAddColumnsSql(String dbName, String
tableName,
+ List<StarRocksColumnInfo> columnList) {
+ final List<String> columnInfoList = getColumnsInfo(columnList);
+ final List<String> resultList = new ArrayList<>();
+ final StringBuilder sqlBuilder = new StringBuilder();
+ columnInfoList.forEach(columnInfo -> {
+ sqlBuilder.append("ALTER TABLE ")
+ .append(dbName)
+ .append(".")
+ .append(tableName)
+ .append(" ADD COLUMN ")
+ .append(columnInfo)
+ .append(";");
+ resultList.add(sqlBuilder.toString());
+ sqlBuilder.delete(0, sqlBuilder.length());
+ });
+ return resultList;
+ }
+
+ /**
+ * Build column info by StarRocksColumnInfo list.
+ *
+ * @param columns StarRocks column info {@link StarRocksColumnInfo} list
+ * @return the SQL list
+ */
+ private static List<String> getColumnsInfo(List<StarRocksColumnInfo>
columns) {
+ final List<String> columnList = new ArrayList<>();
+ final StringBuilder columnBuilder = new StringBuilder();
+ columns.forEach(columnInfo -> {
+ columnBuilder.append("`")
+ .append(columnInfo.getFieldName())
+ .append("`")
+ .append(" ")
+ .append(columnInfo.getFieldType());
+ if (!StringUtils.isEmpty(columnInfo.getFieldComment())) {
+ columnBuilder.append(" COMMENT '")
+ .append(columnInfo.getFieldComment())
+ .append("'");
+ }
+ columnBuilder.append(" ");
+ columnList.add(columnBuilder.toString());
+ columnBuilder.delete(0, columnBuilder.length());
+ });
+ return columnList;
+ }
+
+ /**
+ * Get columns and table comment string for create table SQL.
+ *
+ * For example: col_name data_type [COMMENT col_comment], col_name
data_type [COMMENT col_comment]....
+ */
+ private static String getColumnsAndComments(StarRocksTableInfo tableInfo) {
+ List<StarRocksColumnInfo> columns = tableInfo.getColumns();
+ List<String> columnList = new ArrayList<>();
+ List<String> sortKeyList = new ArrayList<>();
+ List<String> distributeList = new ArrayList<>();
+ for (StarRocksColumnInfo columnInfo : columns) {
+ // Construct columns and partition columns
+ StringBuilder columnStr = new
StringBuilder().append("`").append(columnInfo.getFieldName()).append("` ")
+ .append(columnInfo.getFieldType());
+ if (StringUtils.isNotEmpty(columnInfo.getFieldComment())) {
+ columnStr.append(" COMMENT
").append("'").append(columnInfo.getFieldComment()).append("'");
+ }
+
+ if (columnInfo.getIsDistributed()) {
+ distributeList.add("`" + columnInfo.getFieldName() + "` ");
+ }
+ if (columnInfo.getIsSortKey()) {
+ sortKeyList.add("`" + columnInfo.getFieldName() + "` ");
+ }
+ columnList.add(columnStr.toString());
+ }
+ StringBuilder result = new StringBuilder().append("
(").append(StringUtils.join(columnList, ",")).append(") ");
+ // set partitions
+ if (sortKeyList.size() > 0) {
+ result.append("DUPLICATE KEY
(").append(StringUtils.join(sortKeyList, ",")).append(") ");
+ }
+ if (distributeList.size() <= 0 && columns.size() > 0) {
+ distributeList.add("`" + columns.get(0).getFieldName() + "` ");
+ }
+ result.append("DISTRIBUTED BY HASH
(").append(StringUtils.join(distributeList, ",")).append(") ")
+ .append("BUCKETS ")
+ .append(tableInfo.getBarrelSize());
+ return result.toString();
+ }
+
+ /**
+ * Build SQL to check whether the table exists.
+ *
+ * @param dbName StarRocks database name
+ * @param tableName StarRocks table name
+ * @return the check table SQL string
+ */
+ public static String getCheckTable(String dbName, String tableName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("select table_schema,table_name ")
+ .append(" from information_schema.tables where table_schema =
'")
+ .append(dbName)
+ .append("' and table_name = '")
+ .append(tableName)
+ .append("' ;");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build SQL to check whether the column exists.
+ *
+ * @param dbName StarRocks database name
+ * @param tableName StarRocks table name
+ * @param columnName StarRocks column name
+ * @return the check column SQL string
+ */
+ public static String getCheckColumn(String dbName, String tableName,
String columnName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ")
+ .append(" from information_schema.COLUMNS where
table_schema='")
+ .append(dbName)
+ .append("' and table_name = '")
+ .append(tableName)
+ .append("' and column_name = '")
+ .append(columnName)
+ .append("';");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index 5d8785620..e9ff58717 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -18,17 +18,20 @@
package org.apache.inlong.manager.service.sink.starrocks;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.List;
-import javax.validation.constraints.NotNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO;
import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkRequest;
@@ -38,6 +41,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* StarRocks sink operator, such as save or update StarRocks field, etc.
*/
@@ -90,4 +97,66 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
return sink;
}
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
+ List<SinkField> fieldList = request.getSinkFieldList();
+ LOGGER.info("begin to save es sink fields={}", fieldList);
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return;
+ }
+
+ int size = fieldList.size();
+ List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkType = request.getSinkType();
+ Integer sinkId = request.getId();
+ for (SinkField fieldInfo : fieldList) {
+ this.checkFieldInfo(fieldInfo);
+ StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ try {
+ StarRocksColumnInfo dto =
StarRocksColumnInfo.getFromRequest(fieldInfo);
+ fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("parsing json string to sink field info failed",
e);
+ throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSinkType(sinkType);
+ fieldEntity.setSinkId(sinkId);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+ entityList.add(fieldEntity);
+ }
+
+ sinkFieldMapper.insertAll(entityList);
+ LOGGER.info("success to save starRock sink fields");
+ }
+
+ @Override
+ public List<SinkField> getSinkFields(Integer sinkId) {
+ List<StreamSinkFieldEntity> sinkFieldEntities =
sinkFieldMapper.selectBySinkId(sinkId);
+ List<SinkField> fieldList = new ArrayList<>();
+ if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+ return fieldList;
+ }
+ sinkFieldEntities.forEach(field -> {
+ SinkField sinkField = new SinkField();
+ if (StringUtils.isNotBlank(field.getExtParams())) {
+ StarRocksColumnInfo starRocksColumnInfo =
StarRocksColumnInfo.getFromJson(
+ field.getExtParams());
+ CommonBeanUtils.copyProperties(field, starRocksColumnInfo,
true);
+ fieldList.add(starRocksColumnInfo);
+ } else {
+ CommonBeanUtils.copyProperties(field, sinkField, true);
+ fieldList.add(sinkField);
+ }
+
+ });
+ return fieldList;
+ }
+
}