This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 266148ccd [INLONG-4133][Manager] Not pass the type field when querying
sources and sinks (#4135)
266148ccd is described below
commit 266148ccd336d2de64dbf0e8391a2d030a1c92a9
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 10 22:37:32 2022 +0800
[INLONG-4133][Manager] Not pass the type field when querying sources and
sinks (#4135)
* Add the judgment of equal ID in the update
* Delete the requirement that type must be used in paging query
* Delete the type parameter of the delete function
* Change the exception log
* Change the check of the source name
Co-authored-by: healchow <[email protected]>
---
.../api/impl/DefaultInlongStreamBuilder.java | 6 +--
.../manager/client/api/impl/InlongStreamImpl.java | 6 +--
.../client/api/inner/InnerInlongManagerClient.java | 8 +---
.../manager/dao/mapper/StreamSinkEntityMapper.java | 6 ++-
.../resources/mappers/StreamSinkEntityMapper.xml | 11 +++--
.../manager/service/sink/StreamSinkOperation.java | 7 ++-
.../manager/service/sink/StreamSinkService.java | 8 ++--
.../service/sink/StreamSinkServiceImpl.java | 52 ++++++++++++++++------
.../service/sink/ck/ClickHouseSinkOperation.java | 5 +--
.../service/sink/hive/HiveSinkOperation.java | 5 +--
.../service/sink/iceberg/IcebergSinkOperation.java | 5 +--
.../service/sink/kafka/KafkaSinkOperation.java | 5 +--
.../service/source/AbstractSourceOperation.java | 33 +++++++++++---
.../service/source/StreamSourceOperation.java | 6 +--
.../service/source/StreamSourceService.java | 14 +++---
.../service/source/StreamSourceServiceImpl.java | 34 ++++++++------
.../listener/AbstractSourceOperateListener.java | 2 +-
.../source/listener/SourceDeleteListener.java | 2 +-
.../source/listener/SourceRestartListener.java | 2 +-
.../source/listener/SourceStopListener.java | 2 +-
.../service/core/impl/AgentServiceTest.java | 2 +-
.../core/sink/ClickHouseStreamSinkServiceTest.java | 6 +--
.../core/sink/HiveStreamSinkServiceTest.java | 8 ++--
.../core/sink/IcebergStreamSinkServiceTest.java | 8 ++--
.../core/sink/KafkaStreamSinkServiceTest.java | 6 +--
.../core/source/StreamSourceServiceTest.java | 10 ++---
.../source/listener/DataSourceListenerTest.java | 5 +--
.../web/controller/StreamSinkController.java | 20 +++------
.../web/controller/StreamSourceController.java | 20 +++------
29 files changed, 163 insertions(+), 141 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index c9b0a8fd0..c744ac407 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -212,9 +212,8 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
for (SourceListResponse sourceListResponse : sourceListResponses) {
final String sourceName = sourceListResponse.getSourceName();
final int id = sourceListResponse.getId();
- final String type = sourceListResponse.getSourceType();
if (sourceRequests.get(sourceName) == null) {
- boolean isDelete = managerClient.deleteSource(id, type);
+ boolean isDelete = managerClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete source=%s
failed", sourceListResponse));
}
@@ -251,9 +250,8 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
for (SinkListResponse sinkListResponse : sinkListResponses) {
final String sinkName = sinkListResponse.getSinkName();
final int id = sinkListResponse.getId();
- final String type = sinkListResponse.getSinkType();
if (sinkRequests.get(sinkName) == null) {
- boolean isDelete = managerClient.deleteSink(id, type);
+ boolean isDelete = managerClient.deleteSink(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete sink=%s
failed", sinkListResponse));
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 11da4a2ef..ebfba5909 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -363,9 +363,8 @@ public class InlongStreamImpl extends InlongStream {
for (SourceListResponse sourceListResponse : sourceListResponses) {
final String sourceName = sourceListResponse.getSourceName();
final int id = sourceListResponse.getId();
- final String type = sourceListResponse.getSourceType();
if (this.streamSources.get(sourceName) == null) {
- boolean isDelete = managerClient.deleteSource(id, type);
+ boolean isDelete = managerClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete source=%s
failed", sourceListResponse));
}
@@ -400,9 +399,8 @@ public class InlongStreamImpl extends InlongStream {
for (SinkListResponse sinkListResponse : sinkListResponses) {
final String sinkName = sinkListResponse.getSinkName();
final int id = sinkListResponse.getId();
- final String type = sinkListResponse.getSinkType();
if (this.streamSinks.get(sinkName) == null) {
- boolean isDelete = managerClient.deleteSink(id, type);
+ boolean isDelete = managerClient.deleteSink(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete sink=%s
failed", sinkListResponse));
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index fa6def6cd..daffd1e4b 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -480,12 +480,10 @@ public class InnerInlongManagerClient {
}
}
- public boolean deleteSource(int id, String type) {
+ public boolean deleteSource(int id) {
AssertUtil.isTrue(id > 0, "sourceId is illegal");
- AssertUtil.notEmpty(type, "sourceType should not be null");
final String path = HTTP_PATH + "/source/delete/" + id;
String url = formatUrl(path);
- url = String.format("%s&sourceType=%s", url, type);
RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), "");
Request request = new Request.Builder()
.url(url)
@@ -638,12 +636,10 @@ public class InnerInlongManagerClient {
}
}
- public boolean deleteSink(int id, String type) {
+ public boolean deleteSink(int id) {
AssertUtil.isTrue(id > 0, "sinkId is illegal");
- AssertUtil.notEmpty(type, "sinkType should not be null");
final String path = HTTP_PATH + "/sink/delete/" + id;
String url = formatUrl(path);
- url = String.format("%s&sinkType=%s", url, type);
RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), "");
Request request = new Request.Builder()
.url(url)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 8ec752811..b6f88b833 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -18,8 +18,8 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.springframework.stereotype.Repository;
@@ -63,9 +63,11 @@ public interface StreamSinkEntityMapper {
*
* @param groupId Inlong group id.
* @param streamId Inlong stream id.
+ * @param sinkName Stream sink name.
* @return Sink entity list.
*/
- List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId,
@Param("streamId") String streamId);
+ List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId,
@Param("streamId") String streamId,
+ @Param("sinkName") String sinkName);
/**
* According to the group id, stream id and sink type, query valid sink
entity list.
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index d7b337a3e..bddd60860 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -94,16 +94,16 @@
<if test="request.status != null and request.status != ''">
and status = #{request.status, jdbcType=INTEGER}
</if>
- <if test="request.inlongClusterName != null and request.status !=
''">
+ <if test="request.inlongClusterName != null and
request.inlongClusterName != ''">
and inlong_cluster_name = #{request.inlongClusterName,
jdbcType=VARCHAR}
</if>
- <if test="request.dataNodeName != null and request.status != ''">
+ <if test="request.dataNodeName != null and request.dataNodeName !=
''">
and data_node_name = #{request.dataNodeName, jdbcType=VARCHAR}
</if>
- <if test="request.sortTaskName != null and request.status != ''">
+ <if test="request.sortTaskName != null and request.sortTaskName !=
''">
and sort_task_name = #{request.sortTaskName, jdbcType=VARCHAR}
</if>
- <if test="request.sortConsumerGroup != null and request.status !=
''">
+ <if test="request.sortConsumerGroup != null and
request.sortConsumerGroup != ''">
and sort_consumer_group = #{request.sortConsumerGroup,
jdbcType=VARCHAR}
</if>
order by modify_time desc
@@ -131,6 +131,9 @@
<if test="streamId != null and streamId != ''">
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
+ <if test="sinkName != null and sinkName != ''">
+ and sink_name = #{sinkName, jdbcType=VARCHAR}
+ </if>
</where>
</select>
<select id="selectByIdAndType"
resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
index d704c8075..43845b282 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
@@ -57,13 +57,12 @@ public interface StreamSinkOperation {
}
/**
- * Get sink info by sink type and sink id.
+ * Get sink info by the given entity.
*
- * @param sinkType Sink type.
- * @param id Sink id.
+ * @param entity the given entity.
* @return Sink info.
*/
- SinkResponse getById(String sinkType, Integer id);
+ SinkResponse getByEntity(StreamSinkEntity entity);
/**
* Get the target from the given entity.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index b04784b9c..90fdf07de 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -42,13 +42,12 @@ public interface StreamSinkService {
Integer save(SinkRequest request, String operator);
/**
- * Query sink information based on id and type.
+ * Query sink information based on id.
*
* @param id Sink id.
- * @param sinkType Sink type.
* @return Sink info.
*/
- SinkResponse get(Integer id, String sinkType);
+ SinkResponse get(Integer id);
/**
* Query sink information based on inlong group id and inlong stream id.
@@ -107,11 +106,10 @@ public interface StreamSinkService {
* Delete the stream sink by the given id and sink type.
*
* @param id The primary key of the sink.
- * @param sinkType Sink type.
* @param operator Operator's name.
* @return Whether succeed
*/
- Boolean delete(Integer id, String sinkType, String operator);
+ Boolean delete(Integer id, String operator);
/**
* Logically delete stream sink with the given conditions.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 0ef33faef..000e7cda6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
@@ -52,6 +53,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -96,8 +98,14 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
// Make sure that there is no sink info with the current groupId and
streamId
String streamId = request.getInlongStreamId();
String sinkType = request.getSinkType();
- List<StreamSinkEntity> sinkExist =
sinkMapper.selectByIdAndType(groupId, streamId, sinkType);
- Preconditions.checkEmpty(sinkExist,
ErrorCodeEnum.SINK_ALREADY_EXISTS.getMessage());
+ String sinkName = request.getSinkName();
+ List<StreamSinkEntity> sinkList =
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+ for (StreamSinkEntity sinkEntity : sinkList) {
+ if (sinkEntity != null && Objects.equals(sinkEntity.getSinkName(),
sinkName)) {
+ String err = "sink name=%s already exists with the groupId=%s
streamId=%s";
+ throw new BusinessException(String.format(err, sinkName,
groupId, streamId));
+ }
+ }
// According to the sink type, save sink information
StreamSinkOperation operation =
operationFactory.getInstance(SinkType.forType(sinkType));
@@ -114,10 +122,17 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
}
@Override
- public SinkResponse get(Integer id, String sinkType) {
+ public SinkResponse get(Integer id) {
+ Preconditions.checkNotNull(id, "sink id is empty");
+ StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+ if (entity == null) {
+ LOGGER.error("sink not found by id={}", id);
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+ }
+ String sinkType = entity.getSinkType();
StreamSinkOperation operation =
operationFactory.getInstance(SinkType.forType(sinkType));
- SinkResponse sinkResponse = operation.getById(sinkType, id);
- LOGGER.debug("success to get sink by id={}, sinkType={}", id,
sinkType);
+ SinkResponse sinkResponse = operation.getByEntity(entity);
+ LOGGER.debug("success to get sink info by id={}", id);
return sinkResponse;
}
@@ -131,12 +146,12 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
@Override
public List<SinkResponse> listSink(String groupId, String streamId) {
Preconditions.checkNotNull(groupId,
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- List<StreamSinkEntity> entityList =
sinkMapper.selectByRelatedId(groupId, streamId);
+ List<StreamSinkEntity> entityList =
sinkMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
}
List<SinkResponse> responseList = new ArrayList<>();
- entityList.forEach(entity -> responseList.add(this.get(entity.getId(),
entity.getSinkType())));
+ entityList.forEach(entity ->
responseList.add(this.get(entity.getId())));
LOGGER.debug("success to list sink by groupId={}, streamId={}",
groupId, streamId);
return responseList;
@@ -157,6 +172,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
@Override
public PageInfo<? extends SinkListResponse>
listByCondition(SinkPageRequest request) {
Preconditions.checkNotNull(request.getInlongGroupId(),
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
PageHelper.startPage(request.getPageNum(), request.getPageSize());
List<StreamSinkEntity> entityPage =
sinkMapper.selectByCondition(request);
Map<SinkType, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -187,10 +203,20 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
// Check if it can be modified
String groupId = request.getInlongGroupId();
- InlongGroupEntity groupEntity =
commonOperateService.checkGroupStatus(groupId, operator);
-
String streamId = request.getInlongStreamId();
+ String sinkName = request.getSinkName();
String sinkType = request.getSinkType();
+ InlongGroupEntity groupEntity =
commonOperateService.checkGroupStatus(groupId, operator);
+
+ // Check whether the sink name exists with the same groupId and
streamId
+ List<StreamSinkEntity> sinkList =
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+ for (StreamSinkEntity entity : sinkList) {
+ Integer sinkId = entity.getId();
+ if (!Objects.equals(request.getId(), sinkId) &&
Objects.equals(entity.getSinkName(), sinkName)) {
+ String err = "sink name=%s already exists with the groupId=%s
streamId=%s";
+ throw new BusinessException(String.format(err, sinkName,
groupId, streamId));
+ }
+ }
StreamSinkOperation operation =
operationFactory.getInstance(SinkType.forType(sinkType));
operation.updateOpt(request, operator);
@@ -217,8 +243,8 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public Boolean delete(Integer id, String sinkType, String operator) {
- LOGGER.info("begin to delete sink by id={}, sinkType={}", id,
sinkType);
+ public Boolean delete(Integer id, String operator) {
+ LOGGER.info("begin to delete sink by id={}", id);
Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
// Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
@@ -249,7 +275,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
commonOperateService.checkGroupStatus(groupId, operator);
Date now = new Date();
- List<StreamSinkEntity> entityList =
sinkMapper.selectByRelatedId(groupId, streamId);
+ List<StreamSinkEntity> entityList =
sinkMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isNotEmpty(entityList)) {
entityList.forEach(entity -> {
Integer id = entity.getId();
@@ -278,7 +304,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
// Check if it can be deleted
commonOperateService.checkGroupStatus(groupId, operator);
- List<StreamSinkEntity> entityList =
sinkMapper.selectByRelatedId(groupId, streamId);
+ List<StreamSinkEntity> entityList =
sinkMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isNotEmpty(entityList)) {
entityList.forEach(entity -> {
sinkMapper.deleteByPrimaryKey(entity.getId());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
index f5e9f907b..a29d34da4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
@@ -138,15 +138,14 @@ public class ClickHouseSinkOperation implements
StreamSinkOperation {
}
@Override
- public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id)
{
- StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+ public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_CLICKHOUSE, existType));
SinkResponse response = this.getFromEntity(entity,
ClickHouseSinkResponse::new);
- List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(id);
+ List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(entity.getId());
List<SinkFieldResponse> infos =
CommonBeanUtils.copyListProperties(entities,
SinkFieldResponse::new);
response.setFieldList(infos);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
index 01eeec022..5da3a1322 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
@@ -138,15 +138,14 @@ public class HiveSinkOperation implements
StreamSinkOperation {
}
@Override
- public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id)
{
- StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+ public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_HIVE, existType));
SinkResponse response = this.getFromEntity(entity,
HiveSinkResponse::new);
- List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(id);
+ List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(entity.getId());
List<SinkFieldResponse> infos =
CommonBeanUtils.copyListProperties(entities,
SinkFieldResponse::new);
response.setFieldList(infos);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
index 67be53419..f98798b48 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
@@ -132,15 +132,14 @@ public class IcebergSinkOperation implements
StreamSinkOperation {
}
@Override
- public SinkResponse getById(String sinkType, Integer id) {
- StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+ public SinkResponse getByEntity(StreamSinkEntity entity) {
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_ICEBERG, existType));
SinkResponse response = this.getFromEntity(entity,
IcebergSinkResponse::new);
- List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(id);
+ List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(entity.getId());
List<SinkFieldResponse> infos =
CommonBeanUtils.copyListProperties(entities,
SinkFieldResponse::new);
response.setFieldList(infos);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
index 23e8a4b2e..0e9e041ff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
@@ -136,14 +136,13 @@ public class KafkaSinkOperation implements
StreamSinkOperation {
}
@Override
- public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id)
{
- StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+ public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(),
SinkType.SINK_KAFKA, existType));
SinkResponse response = this.getFromEntity(entity,
KafkaSinkResponse::new);
- List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(id);
+ List<StreamSinkFieldEntity> entities =
sinkFieldMapper.selectBySinkId(entity.getId());
List<SinkFieldResponse> infos =
CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
response.setFieldList(infos);
return response;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index 6382e6c7d..bcbce83f4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -44,6 +44,7 @@ import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.Objects;
/**
* Default operation of stream source.
@@ -86,8 +87,8 @@ public abstract class AbstractSourceOperation implements
StreamSourceOperation {
String sourceName = request.getSourceName();
List<StreamSourceEntity> existList =
sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
if (CollectionUtils.isNotEmpty(existList)) {
- String err = "stream source already exists with groupId=%s,
streamId=%s, sourceName=%s";
- throw new BusinessException(String.format(err, groupId, streamId,
sourceName));
+ String err = "source name=%s already exists with groupId=%s
streamId=%s";
+ throw new BusinessException(String.format(err, sourceName,
groupId, streamId));
}
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request,
StreamSourceEntity::new);
@@ -112,15 +113,14 @@ public abstract class AbstractSourceOperation implements
StreamSourceOperation {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.NOT_SUPPORTED)
- public SourceResponse getById(@NotNull Integer id) {
- StreamSourceEntity entity = sourceMapper.selectById(id);
+ public SourceResponse getByEntity(@NotNull StreamSourceEntity entity) {
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(),
getSourceType(), existType));
SourceResponse sourceResponse = this.getFromEntity(entity,
this::getResponse);
- List<StreamSourceFieldEntity> sourceFieldEntities =
sourceFieldMapper.selectBySourceId(id);
+ List<StreamSourceFieldEntity> sourceFieldEntities =
sourceFieldMapper.selectBySourceId(entity.getId());
List<InlongStreamFieldInfo> fieldInfos =
CommonBeanUtils.copyListProperties(sourceFieldEntities,
InlongStreamFieldInfo::new);
sourceResponse.setFieldList(fieldInfos);
@@ -133,9 +133,28 @@ public abstract class AbstractSourceOperation implements
StreamSourceOperation {
StreamSourceEntity entity =
sourceMapper.selectByIdForUpdate(request.getId());
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
- throw new BusinessException(String.format("Source=%s is not
allowed to update, "
+ throw new BusinessException(String.format("source=%s is not
allowed to update, "
+ "please wait until its changed to final status or stop /
frozen / delete it firstly", entity));
}
+
+ // Source type cannot be changed
+ if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
+ throw new BusinessException(String.format("source type=%s cannot
change to %s",
+ entity.getSourceType(), request.getSourceType()));
+ }
+
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sourceName = request.getSourceName();
+ List<StreamSourceEntity> sourceList =
sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
+ for (StreamSourceEntity sourceEntity : sourceList) {
+ Integer sourceId = sourceEntity.getId();
+ if (!Objects.equals(sourceId, request.getId())) {
+ String err = "source name=%s already exists with the
groupId=%s streamId=%s";
+ throw new BusinessException(String.format(err, sourceName,
groupId, streamId));
+ }
+ }
+
// Setting updated parameters of stream source entity.
setTargetEntity(request, entity);
entity.setVersion(entity.getVersion() + 1);
@@ -153,7 +172,7 @@ public abstract class AbstractSourceOperation implements
StreamSourceOperation {
SourceStatus curState = SourceStatus.forCode(existEntity.getStatus());
SourceStatus nextState = SourceStatus.TO_BE_ISSUED_FROZEN;
if (!SourceStatus.isAllowedTransition(curState, nextState)) {
- throw new BusinessException(String.format("Source=%s is not
allowed to stop", existEntity));
+ throw new BusinessException(String.format("source=%s is not
allowed to stop", existEntity));
}
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request,
StreamSourceEntity::new);
curEntity.setVersion(existEntity.getVersion() + 1);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
index c47e9e3e9..c17de31ce 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
@@ -48,12 +48,12 @@ public interface StreamSourceOperation {
Integer saveOpt(SourceRequest request, Integer groupStatus, String
operator);
/**
- * Get source info by source id.
+ * Get source info by the given entity.
*
- * @param id Source id.
+ * @param entity Get field value from the entity.
* @return Source info.
*/
- SourceResponse getById(Integer id);
+ SourceResponse getByEntity(StreamSourceEntity entity);
/**
* Get the target from the given entity.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 2cd3de8ac..ade75d69a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -40,13 +40,12 @@ public interface StreamSourceService {
Integer save(SourceRequest request, String operator);
/**
- * Query source information based on id and type.
+ * Query source information based on id
*
* @param id source id.
- * @param sourceType Source type.
* @return Source info
*/
- SourceResponse get(Integer id, String sourceType);
+ SourceResponse get(Integer id);
/**
* Query source information based on inlong group id and inlong stream id.
@@ -98,31 +97,28 @@ public interface StreamSourceService {
* Delete the stream source by the given id and source type.
*
* @param id The primary key of the source.
- * @param sourceType Source type.
* @param operator Operator's name
* @return Whether succeed
*/
- boolean delete(Integer id, String sourceType, String operator);
+ boolean delete(Integer id, String operator);
/**
* Delete the stream source by the given id and source type.
*
* @param id The primary key of the source.
- * @param sourceType Source type.
* @param operator Operator's name
* @return Whether succeed
*/
- boolean restart(Integer id, String sourceType, String operator);
+ boolean restart(Integer id, String operator);
/**
* Delete the stream source by the given id and source type.
*
* @param id The primary key of the source.
- * @param sourceType Source type.
* @param operator Operator's name
* @return Whether succeed
*/
- boolean stop(Integer id, String sourceType, String operator);
+ boolean stop(Integer id, String operator);
/**
* Logically delete stream source with the given conditions.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2fe1a96b4..b2472233b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -87,9 +88,15 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
}
@Override
- public SourceResponse get(Integer id, String sourceType) {
- StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(sourceType));
- SourceResponse sourceResponse = operation.getById(id);
+ public SourceResponse get(Integer id) {
+ Preconditions.checkNotNull(id, "source id is empty");
+ StreamSourceEntity entity = sourceMapper.selectById(id);
+ if (entity == null) {
+ LOGGER.error("source not found by id={}", id);
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
+ }
+ StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
+ SourceResponse sourceResponse = operation.getByEntity(entity);
LOGGER.debug("success to get source by id={}", id);
return sourceResponse;
}
@@ -109,7 +116,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
return Collections.emptyList();
}
List<SourceResponse> responseList = new ArrayList<>();
- entityList.forEach(entity -> responseList.add(this.get(entity.getId(),
entity.getSourceType())));
+ entityList.forEach(entity ->
responseList.add(this.get(entity.getId())));
LOGGER.debug("success to list source by groupId={}, streamId={}",
groupId, streamId);
return responseList;
@@ -118,6 +125,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Override
public PageInfo<? extends SourceListResponse>
listByCondition(SourcePageRequest request) {
Preconditions.checkNotNull(request.getInlongGroupId(),
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
PageHelper.startPage(request.getPageNum(), request.getPageSize());
List<StreamSourceEntity> entityList =
sourceMapper.selectByCondition(request);
@@ -175,15 +183,15 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW,
isolation = Isolation.READ_COMMITTED)
- public boolean delete(Integer id, String sourceType, String operator) {
- LOGGER.info("begin to delete source by id={}, sourceType={}", id,
sourceType);
+ public boolean delete(Integer id, String operator) {
+ LOGGER.info("begin to delete source by id={}", id);
Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(),
operator);
- StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(sourceType));
+ StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
operation.deleteOpt(sourceRequest, operator);
@@ -195,13 +203,13 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW,
isolation = Isolation.READ_COMMITTED)
- public boolean restart(Integer id, String sourceType, String operator) {
- LOGGER.info("begin to restart source by id={}, sourceType={}", id,
sourceType);
+ public boolean restart(Integer id, String operator) {
+ LOGGER.info("begin to restart source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(),
operator);
- StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(sourceType));
+ StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
operation.restartOpt(sourceRequest, operator);
@@ -213,13 +221,13 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW,
isolation = Isolation.READ_COMMITTED)
- public boolean stop(Integer id, String sourceType, String operator) {
- LOGGER.info("begin to stop source by id={}, sourceType={}", id,
sourceType);
+ public boolean stop(Integer id, String operator) {
+ LOGGER.info("begin to stop source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(),
operator);
- StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(sourceType));
+ StreamSourceOperation operation =
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
operation.stopOpt(sourceRequest, operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
index 9674fcfe4..2e77e0e23 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
@@ -110,7 +110,7 @@ public abstract class AbstractSourceOperateListener
implements DataSourceOperate
} else {
log.warn("StreamSource={} cannot be operated for state={}",
sourceResponse, sourceStatus);
TimeUnit.SECONDS.sleep(5);
- sourceResponse =
streamSourceService.get(sourceResponse.getId(), sourceResponse.getSourceType());
+ sourceResponse =
streamSourceService.get(sourceResponse.getId());
}
}
SourceStatus sourceStatus =
SourceStatus.forCode(sourceResponse.getStatus());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
index 80b7b2420..651545d07 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
@@ -32,6 +32,6 @@ public class SourceDeleteListener extends
AbstractSourceOperateListener {
@Override
public void operateStreamSource(SourceRequest sourceRequest, String
operator) {
- streamSourceService.delete(sourceRequest.getId(),
sourceRequest.getSourceType(), operator);
+ streamSourceService.delete(sourceRequest.getId(), operator);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
index baa04f246..dab7b43c5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
@@ -30,6 +30,6 @@ public class SourceRestartListener extends
AbstractSourceOperateListener {
@Override
public void operateStreamSource(SourceRequest sourceRequest, String
operator) {
- streamSourceService.restart(sourceRequest.getId(),
sourceRequest.getSourceType(), operator);
+ streamSourceService.restart(sourceRequest.getId(), operator);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
index c675c8f89..369f0a236 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
@@ -30,6 +30,6 @@ public class SourceStopListener extends
AbstractSourceOperateListener {
@Override
public void operateStreamSource(SourceRequest sourceRequest, String
operator) {
- streamSourceService.stop(sourceRequest.getId(),
sourceRequest.getSourceType(), operator);
+ streamSourceService.stop(sourceRequest.getId(), operator);
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index e5cb74238..3723ce21e 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -70,7 +70,7 @@ public class AgentServiceTest extends ServiceBaseTest {
Boolean result = agentService.reportSnapshot(request);
Assert.assertTrue(result);
- sourceService.delete(id, SourceType.BINLOG.getType(), GLOBAL_OPERATOR);
+ sourceService.delete(id, GLOBAL_OPERATOR);
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
index 6203d4634..d53c95b77 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
@@ -67,14 +67,14 @@ public class ClickHouseStreamSinkServiceTest extends
ServiceBaseTest {
}
public void deleteKafkaSink(Integer sinkId) {
- boolean result = sinkService.delete(sinkId, SinkType.SINK_CLICKHOUSE,
globalOperator);
+ boolean result = sinkService.delete(sinkId, globalOperator);
Assert.assertTrue(result);
}
@Test
public void testListByIdentifier() {
Integer sinkId = this.saveSink("default1");
- SinkResponse sink = sinkService.get(sinkId, SinkType.SINK_CLICKHOUSE);
+ SinkResponse sink = sinkService.get(sinkId);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
deleteKafkaSink(sinkId);
}
@@ -82,7 +82,7 @@ public class ClickHouseStreamSinkServiceTest extends
ServiceBaseTest {
@Test
public void testGetAndUpdate() {
Integer sinkId = this.saveSink("default2");
- SinkResponse response = sinkService.get(sinkId,
SinkType.SINK_CLICKHOUSE);
+ SinkResponse response = sinkService.get(sinkId);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
ClickHouseSinkResponse kafkaSinkResponse = (ClickHouseSinkResponse)
response;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
index 049ddf425..e8dae7372 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
@@ -62,7 +62,7 @@ public class HiveStreamSinkServiceTest extends
ServiceBaseTest {
Integer id = this.saveSink();
Assert.assertNotNull(id);
- boolean result = sinkService.delete(id, SinkType.SINK_HIVE,
globalOperator);
+ boolean result = sinkService.delete(id, globalOperator);
Assert.assertTrue(result);
}
@@ -70,16 +70,16 @@ public class HiveStreamSinkServiceTest extends
ServiceBaseTest {
public void testListByIdentifier() {
Integer id = this.saveSink();
- SinkResponse sink = sinkService.get(id, SinkType.SINK_HIVE);
+ SinkResponse sink = sinkService.get(id);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
- sinkService.delete(id, SinkType.SINK_HIVE, globalOperator);
+ sinkService.delete(id, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSink();
- SinkResponse response = sinkService.get(id, SinkType.SINK_HIVE);
+ SinkResponse response = sinkService.get(id);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
HiveSinkResponse hiveResponse = (HiveSinkResponse) response;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
index 92b118381..9bbe253b2 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
@@ -59,22 +59,22 @@ public class IcebergStreamSinkServiceTest extends
ServiceBaseTest {
public void testSaveAndDelete() {
Integer id = this.saveSink("default1");
Assert.assertNotNull(id);
- boolean result = sinkService.delete(id, SinkType.SINK_ICEBERG,
globalOperator);
+ boolean result = sinkService.delete(id, globalOperator);
Assert.assertTrue(result);
}
@Test
public void testListByIdentifier() {
Integer id = this.saveSink("default2");
- SinkResponse sink = sinkService.get(id, SinkType.SINK_ICEBERG);
+ SinkResponse sink = sinkService.get(id);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
- sinkService.delete(id, SinkType.SINK_ICEBERG, globalOperator);
+ sinkService.delete(id, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSink("default3");
- SinkResponse response = sinkService.get(id, SinkType.SINK_ICEBERG);
+ SinkResponse response = sinkService.get(id);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
IcebergSinkResponse icebergSinkResponse = (IcebergSinkResponse)
response;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
index c77d0e2e3..4f7079e61 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
@@ -64,14 +64,14 @@ public class KafkaStreamSinkServiceTest extends
ServiceBaseTest {
}
public void deleteKafkaSink(Integer kafkaSinkId) {
- boolean result = sinkService.delete(kafkaSinkId, SinkType.SINK_KAFKA,
globalOperator);
+ boolean result = sinkService.delete(kafkaSinkId, globalOperator);
Assert.assertTrue(result);
}
@Test
public void testListByIdentifier() {
Integer kafkaSinkId = this.saveSink("default1");
- SinkResponse sink = sinkService.get(kafkaSinkId, SinkType.SINK_KAFKA);
+ SinkResponse sink = sinkService.get(kafkaSinkId);
Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
deleteKafkaSink(kafkaSinkId);
}
@@ -79,7 +79,7 @@ public class KafkaStreamSinkServiceTest extends
ServiceBaseTest {
@Test
public void testGetAndUpdate() {
Integer kafkaSinkId = this.saveSink("default2");
- SinkResponse response = sinkService.get(kafkaSinkId,
SinkType.SINK_KAFKA);
+ SinkResponse response = sinkService.get(kafkaSinkId);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
KafkaSinkResponse kafkaSinkResponse = (KafkaSinkResponse) response;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index 10682ca24..dff9e8f04 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -60,7 +60,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
Integer id = this.saveSource();
Assert.assertNotNull(id);
- boolean result = sourceService.delete(id, SourceType.BINLOG.getType(),
globalOperator);
+ boolean result = sourceService.delete(id, globalOperator);
Assert.assertTrue(result);
}
@@ -68,16 +68,16 @@ public class StreamSourceServiceTest extends
ServiceBaseTest {
public void testListByIdentifier() {
Integer id = this.saveSource();
- SourceResponse source = sourceService.get(id,
SourceType.BINLOG.getType());
+ SourceResponse source = sourceService.get(id);
Assert.assertEquals(globalGroupId, source.getInlongGroupId());
- sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
+ sourceService.delete(id, globalOperator);
}
@Test
public void testGetAndUpdate() {
Integer id = this.saveSource();
- SourceResponse response = sourceService.get(id,
SourceType.BINLOG.getType());
+ SourceResponse response = sourceService.get(id);
Assert.assertEquals(globalGroupId, response.getInlongGroupId());
BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -86,7 +86,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
boolean result = sourceService.update(request, globalOperator);
Assert.assertTrue(result);
- sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
+ sourceService.delete(id, globalOperator);
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 29d938b13..bf15aaf8e 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -21,7 +21,6 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
@@ -88,7 +87,7 @@ public class DataSourceListenerTest extends
WorkflowServiceImplTest {
WorkflowProcess process = context.getProcess();
WorkflowTask task = process.getTaskByName("stopSource");
Assert.assertTrue(task instanceof ServiceTask);
- SourceResponse sourceResponse = streamSourceService.get(sourceId,
SourceType.BINLOG.toString());
+ SourceResponse sourceResponse = streamSourceService.get(sourceId);
Assert.assertSame(SourceStatus.forCode(sourceResponse.getStatus()),
SourceStatus.TO_BE_ISSUED_FROZEN);
}
@@ -119,7 +118,7 @@ public class DataSourceListenerTest extends
WorkflowServiceImplTest {
WorkflowProcess process = context.getProcess();
WorkflowTask task = process.getTaskByName("restartSource");
Assert.assertTrue(task instanceof ServiceTask);
- SourceResponse sourceResponse = streamSourceService.get(sourceId,
SourceType.BINLOG.toString());
+ SourceResponse sourceResponse = streamSourceService.get(sourceId);
Assert.assertSame(SourceStatus.forCode(sourceResponse.getStatus()),
SourceStatus.SOURCE_NORMAL);
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 5f6314a36..1930fd71a 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.web.controller;
import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
@@ -37,7 +36,6 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -60,12 +58,9 @@ public class StreamSinkController {
@RequestMapping(value = "/get/{id}", method = RequestMethod.GET)
@ApiOperation(value = "Query sink information")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "id", dataTypeClass = Integer.class,
required = true),
- @ApiImplicitParam(name = "sinkType", dataTypeClass = String.class,
required = true)
- })
- public Response<SinkResponse> get(@PathVariable Integer id, @RequestParam
String sinkType) {
- return Response.success(sinkService.get(id, sinkType));
+ @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
+ public Response<SinkResponse> get(@PathVariable Integer id) {
+ return Response.success(sinkService.get(id));
}
@RequestMapping(value = "/list", method = RequestMethod.GET)
@@ -84,12 +79,9 @@ public class StreamSinkController {
@RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
@OperationLog(operation = OperationType.DELETE)
@ApiOperation(value = "Delete data sink information")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "id", dataTypeClass = Integer.class,
required = true),
- @ApiImplicitParam(name = "sinkType", dataTypeClass = String.class,
required = true)
- })
- public Response<Boolean> delete(@PathVariable Integer id, @RequestParam
String sinkType) {
- boolean result = sinkService.delete(id, sinkType,
LoginUserUtils.getLoginUserDetail().getUserName());
+ @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
+ public Response<Boolean> delete(@PathVariable Integer id) {
+ boolean result = sinkService.delete(id,
LoginUserUtils.getLoginUserDetail().getUserName());
return Response.success(result);
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index e486bdcaf..57c82216e 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.web.controller;
import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
@@ -37,7 +36,6 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -60,12 +58,9 @@ public class StreamSourceController {
@RequestMapping(value = "/get/{id}", method = RequestMethod.GET)
@ApiOperation(value = "Query stream source")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "id", dataTypeClass = Integer.class,
required = true),
- @ApiImplicitParam(name = "sourceType", dataTypeClass =
String.class, required = true)
- })
- public Response<SourceResponse> get(@PathVariable Integer id,
@RequestParam String sourceType) {
- return Response.success(sourceService.get(id, sourceType));
+ @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
+ public Response<SourceResponse> get(@PathVariable Integer id) {
+ return Response.success(sourceService.get(id));
}
@RequestMapping(value = "/list", method = RequestMethod.GET)
@@ -84,12 +79,9 @@ public class StreamSourceController {
@RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
@OperationLog(operation = OperationType.DELETE)
@ApiOperation(value = "Delete stream source")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "id", dataTypeClass = Integer.class,
required = true),
- @ApiImplicitParam(name = "sourceType", dataTypeClass =
String.class, required = true)
- })
- public Response<Boolean> delete(@PathVariable Integer id, @RequestParam
String sourceType) {
- boolean result = sourceService.delete(id, sourceType,
LoginUserUtils.getLoginUserDetail().getUserName());
+ @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
+ public Response<Boolean> delete(@PathVariable Integer id) {
+ boolean result = sourceService.delete(id,
LoginUserUtils.getLoginUserDetail().getUserName());
return Response.success(result);
}