This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b18b51104b [INLONG-11764][Manager] Support SQL stream source (#11765)
b18b51104b is described below
commit b18b51104b6d31044bf53fc2e12b5150f385a6d0
Author: fuweng11 <[email protected]>
AuthorDate: Mon Feb 17 19:48:21 2025 +0800
[INLONG-11764][Manager] Support SQL stream source (#11765)
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SourceType.java | 3 +-
.../manager/pojo/node/sql/SqlDataNodeDTO.java | 61 ++++++++++++++
.../manager/pojo/node/sql/SqlDataNodeInfo.java | 50 ++++++++++++
.../manager/pojo/node/sql/SqlDataNodeRequest.java | 39 +++++++++
.../manager/pojo/source/cos/COSSourceDTO.java | 2 -
.../pojo/source/sql/SqlDataAddTaskRequest.java | 36 +++++++++
.../inlong/manager/pojo/source/sql/SqlSource.java | 92 ++++++++++++++++++++++
.../COSSourceDTO.java => sql/SqlSourceDTO.java} | 70 +++++++---------
.../manager/pojo/source/sql/SqlSourceRequest.java | 74 +++++++++++++++++
.../service/core/impl/AgentServiceImpl.java | 16 ++--
.../service/node/mysql/MySQLDataNodeOperator.java | 2 +-
.../SqlDataNodeOperator.java} | 50 +++---------
.../service/source/AbstractSourceOperator.java | 23 +-----
.../service/source/StreamSourceOperator.java | 6 ++
.../service/source/cos/COSSourceOperator.java | 15 ++++
.../service/source/file/FileSourceOperator.java | 17 ++++
.../SqlSourceOperator.java} | 73 ++++++++++-------
18 files changed, 488 insertions(+), 142 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 35164d31d6..e30df92573 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -30,6 +30,7 @@ public class DataNodeType {
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String MYSQL = "MYSQL";
public static final String COS = "COS";
+ public static final String SQL = "SQL";
public static final String STARROCKS = "STARROCKS";
public static final String REDIS = "REDIS";
public static final String KUDU = "KUDU";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 59deca4749..263950297d 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -32,7 +32,7 @@ public class SourceType extends StreamType {
public static final String FILE = "FILE";
public static final String COS = "COS";
- public static final String MYSQL_SQL = "MYSQL_SQL";
+ public static final String SQL = "SQL";
public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
public static final String MONGODB = "MONGODB";
public static final String REDIS = "REDIS";
@@ -49,7 +49,6 @@ public class SourceType extends StreamType {
put(FILE, TaskTypeEnum.FILE);
put(COS, TaskTypeEnum.COS);
- put(MYSQL_SQL, TaskTypeEnum.SQL);
put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
put(POSTGRESQL, TaskTypeEnum.POSTGRES);
put(ORACLE, TaskTypeEnum.ORACLE);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java
new file mode 100644
index 0000000000..4bd2a3a2ba
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.sql;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sql data node info
+ */
+@Data
+@NoArgsConstructor
+@ApiModel("Sql data node info")
+public class SqlDataNodeDTO {
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static SqlDataNodeDTO getFromRequest(SqlDataNodeRequest request,
String extParams) {
+ SqlDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? SqlDataNodeDTO.getFromJson(extParams)
+ : new SqlDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static SqlDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, SqlDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ String.format("Failed to parse extParams for Sql node:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java
new file mode 100644
index 0000000000..fc9e83f178
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.sql;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Sql data node info
+ */
+@Data
+@SuperBuilder
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.SQL)
+@ApiModel("Sql data node info")
+public class SqlDataNodeInfo extends DataNodeInfo {
+
+ public SqlDataNodeInfo() {
+ this.setType(DataNodeType.SQL);
+ }
+
+ @Override
+ public SqlDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, SqlDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java
new file mode 100644
index 0000000000..df75e3d653
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.sql;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Sql data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.SQL)
+@ApiModel("Sql data node request")
+public class SqlDataNodeRequest extends DataNodeRequest {
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
index 1dd85385fc..4d2bfed790 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
@@ -104,10 +104,8 @@ public class COSSourceDTO {
public static COSSourceDTO getFromJson(@NotNull String extParams) {
try {
- log.info("teste extparmas={}", extParams);
return JsonUtils.parseObject(extParams, COSSourceDTO.class);
} catch (Exception e) {
- log.info("teste extparmas=eoor:", e);
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
String.format("parse extParams of COSSource failure: %s",
e.getMessage()));
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java
new file mode 100644
index 0000000000..50a87b03df
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.sql;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.SQL)
+@ApiModel(value = "Sql data add task request")
+public class SqlDataAddTaskRequest extends DataAddTaskRequest {
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java
new file mode 100644
index 0000000000..7bfa03dc60
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.sql;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Sql source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Sql source info")
+@JsonTypeDefine(value = SourceType.SQL)
+public class SqlSource extends StreamSource {
+
+ @ApiModelProperty(value = "sql", required = true)
+ private String sql;
+
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit;
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry;
+
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute
before, "
+ + "'1h' means from one hour after, '-1h' means from one minute
before, "
+ + "'1d' means from one day after, '-1d' means from one minute
before, "
+ + "Null or blank means from current timestamp")
+ private String timeOffset;
+
+ @ApiModelProperty("Max instance count")
+ private Integer maxInstanceCount;
+
+ @ApiModelProperty("Jdbc url")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("jdbc password")
+ private String jdbcPassword;
+
+ @ApiModelProperty("Fetch size")
+ private Integer fetchSize;
+
+ public SqlSource() {
+ this.setSourceType(SourceType.SQL);
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this, SqlSourceRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java
similarity index 58%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java
index 1dd85385fc..bdcdb773cb 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.source.cos;
+package org.apache.inlong.manager.pojo.source.sql;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -32,29 +32,24 @@ import org.apache.commons.lang3.StringUtils;
import javax.validation.constraints.NotNull;
-import java.util.List;
-
/**
- * COS source information data transfer object
+ * Sql source information data transfer object
*/
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
@Slf4j
-public class COSSourceDTO {
+public class SqlSourceDTO {
- @ApiModelProperty(value = "Path regex pattern for file, such as
/a/b/*.txt", required = true)
- private String pattern;
+ @ApiModelProperty(value = "sql", required = true)
+ private String sql;
@ApiModelProperty("Cycle unit")
- private String cycleUnit = "D";
+ private String cycleUnit;
@ApiModelProperty("Whether retry")
- private Boolean retry = false;;
-
- @ApiModelProperty("Column separator of data source ")
- private String dataSeparator;
+ private Boolean retry = false;
@ApiModelProperty(value = "Data start time")
private String dataTimeFrom;
@@ -69,47 +64,40 @@ public class COSSourceDTO {
+ "Null or blank means from current timestamp")
private String timeOffset;
- @ApiModelProperty("Max file count")
- private String maxFileCount;
+ @ApiModelProperty("Max instance count")
+ private Integer maxInstanceCount;
- @ApiModelProperty(" Type of data result for column separator"
- + " CSV format, set this parameter to a custom separator:
, | : "
- + " Json format, set this parameter to json ")
- private String contentStyle;
+ @ApiModelProperty("Jdbc url")
+ private String jdbcUrl;
- @ApiModelProperty(value = "Audit version")
- private String auditVersion;
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
- @ApiModelProperty("filterStreams")
- private List<String> filterStreams;
+ @ApiModelProperty("jdbc password")
+ private String jdbcPassword;
- @ApiModelProperty(value = "COS bucket name")
- private String bucketName;
+ @ApiModelProperty("Fetch size")
+ private Integer fetchSize;
- @ApiModelProperty(value = "COS secret id")
- private String credentialsId;
-
- @ApiModelProperty(value = "COS secret key")
- private String credentialsKey;
+ @ApiModelProperty("Column separator of data source ")
+ private String dataSeparator;
- @ApiModelProperty(value = "COS region")
- private String region;
+ @ApiModelProperty(value = "Audit version")
+ private String auditVersion;
- public static COSSourceDTO getFromRequest(@NotNull COSSourceRequest
cosSourceRequest, String extParams) {
- COSSourceDTO dto = StringUtils.isNotBlank(extParams)
- ? COSSourceDTO.getFromJson(extParams)
- : new COSSourceDTO();
- return CommonBeanUtils.copyProperties(cosSourceRequest, dto, true);
+ public static SqlSourceDTO getFromRequest(@NotNull SqlSourceRequest
sqlSourceRequest, String extParams) {
+ SqlSourceDTO dto = StringUtils.isNotBlank(extParams)
+ ? SqlSourceDTO.getFromJson(extParams)
+ : new SqlSourceDTO();
+ return CommonBeanUtils.copyProperties(sqlSourceRequest, dto, true);
}
- public static COSSourceDTO getFromJson(@NotNull String extParams) {
+ public static SqlSourceDTO getFromJson(@NotNull String extParams) {
try {
- log.info("teste extparmas={}", extParams);
- return JsonUtils.parseObject(extParams, COSSourceDTO.class);
+ return JsonUtils.parseObject(extParams, SqlSourceDTO.class);
} catch (Exception e) {
- log.info("teste extparmas=eoor:", e);
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
- String.format("parse extParams of COSSource failure: %s",
e.getMessage()));
+ String.format("parse extParams of SqlSource failure: %s",
e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java
new file mode 100644
index 0000000000..6d76f88edd
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.sql;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.SQL)
+@ApiModel(value = "Sql source request")
+public class SqlSourceRequest extends SourceRequest {
+
+ @ApiModelProperty(value = "sql", required = true)
+ private String sql;
+
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit;
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry;
+
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute
before, "
+ + "'1h' means from one hour after, '-1h' means from one minute
before, "
+ + "'1d' means from one day after, '-1d' means from one minute
before, "
+ + "Null or blank means from current timestamp")
+ private String timeOffset;
+
+ @ApiModelProperty("Max instance count")
+ private Integer maxInstanceCount;
+
+ @ApiModelProperty("Jdbc url")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("jdbc password")
+ private String jdbcPassword;
+
+ @ApiModelProperty("Fetch size")
+ private Integer fetchSize;
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 5f7958a3e1..5f4c775b10 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -401,6 +401,13 @@ public class AgentServiceImpl implements AgentService {
if (moduleConfig == null) {
continue;
}
+ if (configResult != null &&
CollectionUtils.isNotEmpty(configResult.getModuleList())) {
+ for (ModuleConfig config :
configResult.getModuleList()) {
+ if (Objects.equals(config.getEntityId(),
moduleId)) {
+ restartTime = config.getRestartTime();
+ }
+ }
+ }
moduleConfig.setRestartTime(restartTime);
String moduleStr = GSON.toJson(moduleConfig);
String moduleMd5 = DigestUtils.md5Hex(moduleStr);
@@ -591,13 +598,13 @@ public class AgentServiceImpl implements AgentService {
@Override
public ConfigResult getConfig(ConfigRequest request) {
- if (!updateModuleConfigQueue.contains(request)) {
- updateModuleConfigQueue.add(request);
- }
String key = request.getLocalIp() + InlongConstants.UNDERSCORE +
request.getClusterName();
ConfigResult configResult = installerConfigMap.get(key);
if (configResult == null) {
+ if (!updateModuleConfigQueue.contains(request)) {
+ updateModuleConfigQueue.add(request);
+ }
LOGGER.debug(String.format("can not get config result for cluster
name=%s, ip=%s", request.getClusterName(),
request.getLocalIp()));
return null;
@@ -653,8 +660,7 @@ public class AgentServiceImpl implements AgentService {
needAddStatusList =
Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(),
SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
}
- List<String> sourceTypes = Lists.newArrayList(SourceType.MYSQL_SQL,
SourceType.KAFKA,
- SourceType.MYSQL_BINLOG, SourceType.POSTGRESQL);
+ List<String> sourceTypes = Lists.newArrayList(SourceType.KAFKA,
SourceType.MYSQL_BINLOG, SourceType.POSTGRESQL);
List<StreamSourceEntity> sourceEntities =
sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes,
TASK_FETCH_SIZE);
for (StreamSourceEntity sourceEntity : sourceEntities) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index e9b3f002fd..13428de606 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -118,7 +118,7 @@ public class MySQLDataNodeOperator extends
AbstractDataNodeOperator {
|| !Objects.equals(nodeRequest.getUsername(),
nodeInfo.getUsername())
|| !Objects.equals(nodeRequest.getToken(),
nodeInfo.getToken());
if (changed) {
- retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(),
SourceType.MYSQL_SQL, operator);
+ retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(),
SourceType.MYSQL_BINLOG, operator);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java
similarity index 60%
copy from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
copy to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java
index e9b3f002fd..7ca7d344b8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.node.mysql;
+package org.apache.inlong.manager.service.node.sql;
import org.apache.inlong.manager.common.consts.DataNodeType;
-import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -27,31 +26,24 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
-import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
-import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest;
+import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.Connection;
-import java.util.Objects;
/**
- * MySQL data node operator
+ * Sql data node operator
*/
@Service
-public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
+public class SqlDataNodeOperator extends AbstractDataNodeOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MySQLDataNodeOperator.class);
-
- @Autowired
- private ObjectMapper objectMapper;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SqlDataNodeOperator.class);
@Override
public Boolean accept(String dataNodeType) {
@@ -60,7 +52,7 @@ public class MySQLDataNodeOperator extends
AbstractDataNodeOperator {
@Override
public String getDataNodeType() {
- return DataNodeType.MYSQL;
+ return DataNodeType.SQL;
}
@Override
@@ -69,26 +61,15 @@ public class MySQLDataNodeOperator extends
AbstractDataNodeOperator {
throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
}
- MySQLDataNodeInfo dataNodeInfo = new MySQLDataNodeInfo();
+ SqlDataNodeInfo dataNodeInfo = new SqlDataNodeInfo();
CommonBeanUtils.copyProperties(entity, dataNodeInfo);
- if (StringUtils.isNotBlank(entity.getExtParams())) {
- MySQLDataNodeDTO dto =
MySQLDataNodeDTO.getFromJson(entity.getExtParams());
- CommonBeanUtils.copyProperties(dto, dataNodeInfo);
- }
return dataNodeInfo;
}
@Override
protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
- MySQLDataNodeRequest dataNodeRequest = (MySQLDataNodeRequest) request;
+ SqlDataNodeRequest dataNodeRequest = (SqlDataNodeRequest) request;
CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true);
- try {
- MySQLDataNodeDTO dto =
MySQLDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams());
- targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
- } catch (Exception e) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
- String.format("Failed to build extParams for MySQL node:
%s", e.getMessage()));
- }
}
@Override
@@ -109,17 +90,4 @@ public class MySQLDataNodeOperator extends
AbstractDataNodeOperator {
}
}
- @Override
- public void updateRelatedStreamSource(DataNodeRequest request,
DataNodeEntity dataNodeEntity, String operator) {
- MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request;
- MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo)
this.getFromEntity(dataNodeEntity);
- boolean changed = !Objects.equals(nodeRequest.getUrl(),
nodeInfo.getUrl())
- || !Objects.equals(nodeRequest.getBackupUrl(),
nodeInfo.getBackupUrl())
- || !Objects.equals(nodeRequest.getUsername(),
nodeInfo.getUsername())
- || !Objects.equals(nodeRequest.getToken(),
nodeInfo.getToken());
- if (changed) {
- retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(),
SourceType.MYSQL_SQL, operator);
- }
- }
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 83382dbcc1..c65c18b968 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -59,8 +59,6 @@ import
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
-import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeService;
@@ -545,26 +543,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
? TaskStateEnum.RUNNING.getType()
: TaskStateEnum.FROZEN.getType());
dataConfig.setSyncSend(streamEntity.getSyncSend());
- if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) {
- String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
- FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams,
FileSourceDTO.class);
- if (Objects.nonNull(fileSourceDTO)) {
- fileSourceDTO.setDataSeparator(dataSeparator);
-
dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
-
fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
- extParams = JsonUtils.toJsonString(fileSourceDTO);
- }
- }
- if (SourceType.COS.equalsIgnoreCase(entity.getSourceType())) {
- String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
- COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams,
COSSourceDTO.class);
- if (Objects.nonNull(cosSourceDTO)) {
- cosSourceDTO.setDataSeparator(dataSeparator);
- dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion());
- cosSourceDTO.setContentStyle(streamEntity.getDataType());
- extParams = JsonUtils.toJsonString(cosSourceDTO);
- }
- }
+ extParams = sourceOperator.updateDataConfig(extParams,
streamEntity, dataConfig);
InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
// Processing extParams
unpackExtParams(streamEntity.getExtParams(), streamInfo);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 07b0879330..19e7f61190 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.source;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -144,4 +146,8 @@ public interface StreamSourceOperator {
*/
void updateAgentTaskConfig(SourceRequest request, String operator);
+ default String updateDataConfig(String extParams, InlongStreamEntity
streamEntity, DataConfig dataConfig) {
+ return extParams;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
index f99a6321bd..c533d7df7f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
@@ -17,12 +17,14 @@
package org.apache.inlong.manager.service.source.cos;
+import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
@@ -157,4 +159,17 @@ public class COSSourceOperator extends
AbstractSourceOperator {
}
}
+ @Override
+ public String updateDataConfig(String extParams, InlongStreamEntity
streamEntity, DataConfig dataConfig) {
+ String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
+ COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams,
COSSourceDTO.class);
+ if (Objects.nonNull(cosSourceDTO)) {
+ cosSourceDTO.setDataSeparator(dataSeparator);
+ dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion());
+ cosSourceDTO.setContentStyle(streamEntity.getDataType());
+ extParams = JsonUtils.toJsonString(cosSourceDTO);
+ }
+ return extParams;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index faf797dc7a..90a0212aa1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -17,10 +17,13 @@
package org.apache.inlong.manager.service.source.file;
+import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
@@ -43,6 +46,7 @@ import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -140,4 +144,17 @@ public class FileSourceOperator extends
AbstractSourceOperator {
}
}
+ @Override
+ public String updateDataConfig(String extParams, InlongStreamEntity
streamEntity, DataConfig dataConfig) {
+ String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
+ FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams,
FileSourceDTO.class);
+ if (Objects.nonNull(fileSourceDTO)) {
+ fileSourceDTO.setDataSeparator(dataSeparator);
+ dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
+ fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
+ extParams = JsonUtils.toJsonString(fileSourceDTO);
+ }
+ return extParams;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java
similarity index 69%
copy from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
copy to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java
index f99a6321bd..20425fad84 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java
@@ -15,25 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.source.cos;
+package org.apache.inlong.manager.service.source.sql;
+import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
-import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeInfo;
import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.cos.COSDataAddTaskRequest;
-import org.apache.inlong.manager.pojo.source.cos.COSSource;
-import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
-import org.apache.inlong.manager.pojo.source.cos.COSSourceRequest;
+import org.apache.inlong.manager.pojo.source.sql.SqlDataAddTaskRequest;
+import org.apache.inlong.manager.pojo.source.sql.SqlSource;
+import org.apache.inlong.manager.pojo.source.sql.SqlSourceDTO;
+import org.apache.inlong.manager.pojo.source.sql.SqlSourceRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
@@ -51,50 +54,52 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
- * COS source operator, such as get or set COS source info.
+ * Sql source operator, such as get or set Sql source info.
*/
@Service
-public class COSSourceOperator extends AbstractSourceOperator {
+public class SqlSourceOperator extends AbstractSourceOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(COSSourceOperator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SqlSourceOperator.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private StreamSourceEntityMapper sourceMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamEntityMapper;
@Override
public Boolean accept(String sourceType) {
- return SourceType.COS.equals(sourceType);
+ return SourceType.SQL.equals(sourceType);
}
@Override
protected String getSourceType() {
- return SourceType.COS;
+ return SourceType.SQL;
}
@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity
targetEntity) {
- COSSourceRequest sourceRequest = (COSSourceRequest) request;
+ SqlSourceRequest sourceRequest = (SqlSourceRequest) request;
try {
CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
- COSSourceDTO dto = COSSourceDTO.getFromRequest(sourceRequest,
targetEntity.getExtParams());
+ SqlSourceDTO dto = SqlSourceDTO.getFromRequest(sourceRequest,
targetEntity.getExtParams());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
- String.format("serialize extParams of COS SourceDTO
failure: %s", e.getMessage()));
+ String.format("serialize extParams of Sql SourceDTO
failure: %s", e.getMessage()));
}
}
@Override
public StreamSource getFromEntity(StreamSourceEntity entity) {
- COSSource source = new COSSource();
+ SqlSource source = new SqlSource();
if (entity == null) {
return source;
}
- COSSourceDTO dto = COSSourceDTO.getFromJson(entity.getExtParams());
+ SqlSourceDTO dto = SqlSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, source, true);
CommonBeanUtils.copyProperties(dto, source, true);
@@ -113,13 +118,15 @@ public class COSSourceOperator extends
AbstractSourceOperator {
@Override
public String getExtParams(StreamSourceEntity sourceEntity) {
- COSSourceDTO cosSourceDTO =
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
- if (Objects.nonNull(cosSourceDTO) &&
StringUtils.isNotBlank(sourceEntity.getDataNodeName())) {
- COSDataNodeInfo dataNodeInfo =
- (COSDataNodeInfo)
dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(),
- DataNodeType.COS);
- CommonBeanUtils.copyProperties(dataNodeInfo, cosSourceDTO, true);
- return JsonUtils.toJsonString(cosSourceDTO);
+ SqlSourceDTO sqlSourceDTO =
SqlSourceDTO.getFromJson(sourceEntity.getExtParams());
+ if (Objects.nonNull(sqlSourceDTO) &&
StringUtils.isNotBlank(sourceEntity.getDataNodeName())) {
+ SqlDataNodeInfo dataNodeInfo =
+ (SqlDataNodeInfo)
dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(),
+ DataNodeType.SQL);
+ sqlSourceDTO.setJdbcUrl(dataNodeInfo.getUrl());
+ sqlSourceDTO.setUsername(dataNodeInfo.getUsername());
+ sqlSourceDTO.setJdbcPassword(dataNodeInfo.getToken());
+ return JsonUtils.toJsonString(sqlSourceDTO);
}
return sourceEntity.getExtParams();
}
@@ -128,16 +135,15 @@ public class COSSourceOperator extends
AbstractSourceOperator {
@Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
try {
- COSDataAddTaskRequest sourceRequest = (COSDataAddTaskRequest)
request;
+ SqlDataAddTaskRequest sourceRequest = (SqlDataAddTaskRequest)
request;
StreamSourceEntity sourceEntity =
sourceMapper.selectById(request.getSourceId());
- COSSourceDTO dto =
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
+ SqlSourceDTO dto =
SqlSourceDTO.getFromJson(sourceEntity.getExtParams());
dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
dto.setDataTimeTo(sourceRequest.getDataTimeTo());
dto.setRetry(true);
if (request.getIncreaseAuditVersion()) {
dto.setAuditVersion(request.getAuditVersion());
}
- dto.setFilterStreams(sourceRequest.getFilterStreams());
StreamSourceEntity dataAddTaskEntity =
CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
dataAddTaskEntity.setId(null);
@@ -151,10 +157,21 @@ public class COSSourceOperator extends
AbstractSourceOperator {
updateAgentTaskConfig(dataAddTaskRequest, operator);
return id;
} catch (Exception e) {
- LOGGER.error("serialize extParams of COS SourceDTO failure: ", e);
+ LOGGER.error("serialize extParams of Sql SourceDTO failure: ", e);
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
- String.format("serialize extParams of COS SourceDTO
failure: %s", e.getMessage()));
+ String.format("serialize extParams of Sql SourceDTO
failure: %s", e.getMessage()));
}
}
+ @Override
+ public String updateDataConfig(String extParams, InlongStreamEntity
streamEntity, DataConfig dataConfig) {
+ String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
+ SqlSourceDTO sqlSourceDTO = JsonUtils.parseObject(extParams,
SqlSourceDTO.class);
+ if (Objects.nonNull(sqlSourceDTO)) {
+ sqlSourceDTO.setDataSeparator(dataSeparator);
+ dataConfig.setAuditVersion(sqlSourceDTO.getAuditVersion());
+ extParams = JsonUtils.toJsonString(sqlSourceDTO);
+ }
+ return extParams;
+ }
}