This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0cd2e2657 [INLONG-6086][Manager] Support updating and deleting stream
sink by key (#6087)
0cd2e2657 is described below
commit 0cd2e26572cd8f6dc2703aefc415bf9df490317f
Author: vernedeng <[email protected]>
AuthorDate: Sun Oct 16 17:04:13 2022 +0800
[INLONG-6086][Manager] Support updating and deleting stream sink by key
(#6087)
---
.../manager/client/api/service/StreamSinkApi.java | 10 +++
.../manager/service/sink/StreamSinkService.java | 19 +++++
.../service/sink/StreamSinkServiceImpl.java | 87 ++++++++++++++++++++++
.../manager/service/sink/HiveSinkServiceTest.java | 26 +++++++
.../web/controller/StreamSinkController.java | 27 +++++++
5 files changed, 169 insertions(+)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 9324c85f1..505073dd6 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.service;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import retrofit2.Call;
@@ -37,9 +38,18 @@ public interface StreamSinkApi {
@POST("sink/update")
Call<Response<Boolean>> updateSink(@Body SinkRequest request);
+ @POST("sink/updateByKey")
+ Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request);
+
@DELETE("sink/delete/{id}")
Call<Response<Boolean>> deleteSink(@Path("id") Integer id);
+ @DELETE("sink/deleteByKey")
+ Call<Response<Boolean>> deleteSink(
+ @Query("groupId") String groupId,
+ @Query("streamId") String streamId,
+ @Query("name") String name);
+
@GET("sink/list")
Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId")
String groupId,
@Query("inlongStreamId") String streamId, @Query("sinkType")
String sinkType);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 6b09fd551..5ce554c85 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -104,6 +105,15 @@ public interface StreamSinkService {
*/
Boolean update(SinkRequest sinkRequest, String operator);
+ /**
+ * Modify data sink information by key.
+ *
+ * @param sinkRequest Information that needs to be modified.
+ * @param operator Operator's name.
+ * @return Update result.
+ */
+ UpdateResult updateByKey(SinkRequest sinkRequest, String operator);
+
/**
* Modify sink data status.
*
@@ -122,6 +132,15 @@ public interface StreamSinkService {
*/
Boolean delete(Integer id, String operator);
+ /**
+ * Delete the stream sink by given group id, stream id, and sink name.
+ * @param groupId The group id of sink
+ * @param streamId The stream id of sink
+ * @param name The name of sink
+ * @return Whether succeed
+ */
+ Boolean deleteByKey(String groupId, String streamId, String name, String
operator);
+
/**
* Logically delete stream sink with the given conditions.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index e2572651d..4094cf5c8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -37,6 +37,7 @@ import
org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -270,6 +271,60 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public UpdateResult updateByKey(SinkRequest request, String operator) {
+ LOGGER.info("begin to update sink info: {}", request);
+ this.checkParams(request);
+ // Check if it can be modified
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkName = request.getSinkName();
+ groupCheckService.checkGroupStatus(groupId, operator);
+
+ // Check whether the stream exist or not
+ InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
+ Preconditions.checkNotNull(streamEntity,
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
+
+ // Check whether the sink name exists with the same groupId and
streamId, and only one row
+ List<StreamSinkEntity> sinkList =
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+ if (CollectionUtils.isEmpty(sinkList)) {
+ String errMsg = String.format("can not find stream sink with
group=%s, stream=%s, sinkName=%s",
+ groupId, streamId, sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ if (sinkList.size() != 1) {
+ String errMsg = String.format("find %d stream sink with group=%s,
stream=%s, sinkName=%s, "
+ + "but only except 1", sinkList.size(), groupId, streamId,
sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ StreamSinkEntity entity = sinkList.get(0);
+ request.setId(entity.getId());
+ SinkStatus nextStatus = null;
+ boolean streamSuccess =
StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+ if (streamSuccess ||
StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+ nextStatus = SinkStatus.CONFIG_ING;
+ }
+ StreamSinkOperator sinkOperator =
operatorFactory.getInstance(request.getSinkType());
+ sinkOperator.updateOpt(request, nextStatus, operator);
+
+ // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the
[CREATE_STREAM_RESOURCE] process
+ if (streamSuccess) {
+ // To work around the circular reference check we manually
instantiate and wire
+ if (streamProcessOperation == null) {
+ streamProcessOperation = new InlongStreamProcessService();
+
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+ }
+ streamProcessOperation.startProcess(groupId, streamId, operator,
false);
+ }
+ LOGGER.info("success to update sink info: {}", request);
+ return new UpdateResult(entity.getId(), true, request.getVersion() +
1);
+ }
+
@Override
public void updateStatus(int id, int status, String log) {
StreamSinkEntity entity = new StreamSinkEntity();
@@ -295,6 +350,38 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
+ @Transactional(rollbackFor = Throwable.class)
+ @Override
+ public Boolean deleteByKey(String groupId, String streamId, String
sinkName, String operator) {
+ LOGGER.info("begin to delete sink by group id={}, stream id={},
name={}", groupId, streamId, sinkName);
+ Preconditions.checkNotNull(groupId,
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId,
ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(sinkName, "stream sink name is empty or
null");
+
+ // Check whether the sink name exists with the same groupId and
streamId, and only one row
+ List<StreamSinkEntity> sinkList =
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+ if (CollectionUtils.isEmpty(sinkList)) {
+ String errMsg = String.format("can not find stream sink with
group=%s, stream=%s, sinkName=%s",
+ groupId, streamId, sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ if (sinkList.size() != 1) {
+ String errMsg = String.format("find %d stream sink with group=%s,
stream=%s, sinkName=%s, "
+ + "but only except 1", sinkList.size(), groupId, streamId,
sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ StreamSinkEntity entity = sinkList.get(0);
+ groupCheckService.checkGroupStatus(entity.getInlongGroupId(),
operator);
+ StreamSinkOperator sinkOperator =
operatorFactory.getInstance(entity.getSinkType());
+ sinkOperator.deleteOpt(entity, operator);
+ LOGGER.info("success to delete sink info: {}", entity);
+ return true;
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean logicDeleteAll(String groupId, String streamId, String
operator) {
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 6197d4e81..9b1421c83 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
@@ -70,6 +71,15 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
Assertions.assertTrue(result);
}
+ @Test
+ public void testSaveAndDeleteByUniqueKey() {
+ Integer id = this.saveSink();
+ Assertions.assertNotNull(id);
+
+ boolean result = sinkService.deleteByKey(globalGroupId,
globalStreamId, sinkName, globalOperator);
+ Assertions.assertTrue(result);
+ }
+
@Test
public void testListByIdentifier() {
Integer id = this.saveSink();
@@ -95,4 +105,20 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
sinkService.delete(sinkId, globalOperator);
}
+ @Test
+ public void testGetAndUpdateByUniqueKey() {
+ Integer sinkId = this.saveSink();
+ StreamSink streamSink = sinkService.get(sinkId);
+ Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+ HiveSink sink = (HiveSink) streamSink;
+ sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+ HiveSinkRequest request = CommonBeanUtils.copyProperties(sink,
HiveSinkRequest::new);
+ UpdateResult result = sinkService.updateByKey(request, globalOperator);
+ Assertions.assertTrue(result.getSuccess());
+ Assertions.assertEquals(request.getVersion() + 1,
result.getVersion().intValue());
+
+ sinkService.delete(sinkId, globalOperator);
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index b6a8b3de1..74cdf660c 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -19,11 +19,13 @@ package org.apache.inlong.manager.web.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -36,6 +38,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -76,6 +79,13 @@ public class StreamSinkController {
return Response.success(sinkService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
+ @RequestMapping(value = "/sink/updateByKey", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Update stream sink by key")
+ public Response<UpdateResult> updateByKey(@RequestBody SinkRequest
request) {
+ return Response.success(sinkService.updateByKey(request,
LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE)
@OperationLog(operation = OperationType.DELETE)
@ApiOperation(value = "Delete stream sink")
@@ -85,4 +95,21 @@ public class StreamSinkController {
return Response.success(result);
}
+ @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE)
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiOperation(value = "Delete stream sink by key")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class,
required = true),
+ @ApiImplicitParam(name = "streamId", dataTypeClass = String.class,
required = true),
+ @ApiImplicitParam(name = "name", dataTypeClass = String.class,
required = true)
+ })
+ public Response<Boolean> deleteByKey(
+ @RequestParam String groupId,
+ @RequestParam String streamId,
+ @RequestParam String name) {
+ boolean result = sinkService.deleteByKey(groupId, streamId, name,
+ LoginUserUtils.getLoginUser().getName());
+ return Response.success(result);
+ }
+
}