This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6c688acb7 [INLONG-7004][Manager] Support to creating schema of
StreamSource by JSON (#7424)
6c688acb7 is described below
commit 6c688acb797933f4d377853db8e184c1731f9ca9
Author: fuweng11 <[email protected]>
AuthorDate: Mon Feb 27 11:08:43 2023 +0800
[INLONG-7004][Manager] Support to creating schema of StreamSource by JSON
(#7424)
---
.../manager/service/sink/StreamSinkService.java | 13 ++++-
.../service/sink/StreamSinkServiceImpl.java | 23 ++++++++
.../service/stream/InlongStreamService.java | 10 +++-
.../service/stream/InlongStreamServiceImpl.java | 23 ++++++++
.../manager/service/stream/InlongStreamTest.java | 67 ++++++++++++++++++++++
.../web/controller/InlongStreamController.java | 14 +++++
.../web/controller/StreamSinkController.java | 10 ++++
7 files changed, 156 insertions(+), 4 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 4f798806e..caa99873b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -22,17 +22,17 @@ import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
/**
* Service layer interface for stream sink
*/
@@ -123,7 +123,7 @@ public interface StreamSinkService {
* Paging query stream sink info based on conditions.
*
* @param request paging request
- * @param opInfo userinfo of operator
+ * @param opInfo userinfo of operator
* @return sink page list
*/
List<? extends StreamSink> listByCondition(SinkPageRequest request,
UserInfo opInfo);
@@ -245,4 +245,11 @@ public interface StreamSinkService {
*/
Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String
operator);
+ /**
+ * Converts a json string to a sinkFields
+ *
+ * @param fieldsJson JSON string for the field information
+ * @return list of sink field
+ */
+ List<SinkField> parseFields(String fieldsJson);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index bdd680084..d90ca8766 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.sink;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
@@ -90,6 +92,8 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
private StreamSinkFieldEntityMapper sinkFieldMapper;
@Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;
+ @Autowired
+ private ObjectMapper objectMapper;
// To avoid circular dependencies, you cannot use @Autowired, it will be
injected by AutowireCapableBeanFactory
private InlongStreamProcessService streamProcessOperation;
@@ -683,6 +687,25 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
+ @Override
+ public List<SinkField> parseFields(String fieldsJson) {
+ try {
+ Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
+ new TypeReference<Map<String, String>>() {
+ });
+ return fieldsMap.keySet().stream().map(fieldName -> {
+ SinkField field = new SinkField();
+ field.setFieldName(fieldName);
+ field.setFieldType(fieldsMap.get(fieldName));
+ return field;
+ }).collect(Collectors.toList());
+ } catch (Exception e) {
+ LOGGER.error("parse sink fields error", e);
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ String.format("parse sink fields error : %s",
e.getMessage()));
+ }
+ }
+
private void checkSinkRequestParams(SinkRequest request) {
// check request parameter
// check group id
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 01c23a880..51b15be14 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserInfo;
import java.util.List;
@@ -101,7 +102,7 @@ public interface InlongStreamService {
* Query inlong stream brief info list
*
* @param request query request
- * @param opInfo userinfo of operator
+ * @param opInfo userinfo of operator
* @return inlong stream brief list
*/
List<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest request,
UserInfo opInfo);
@@ -230,4 +231,11 @@ public interface InlongStreamService {
*/
void logicDeleteDlqOrRlq(String bid, String topicName, String operator);
+ /**
+ * Converts a json string to a streamFields
+ *
+ * @param fieldsJson JSON string for the field information
+ * @return list of stream field
+ */
+ List<StreamField> parseFields(String fieldsJson);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 9418e3c3e..50a92486f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.stream;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import org.apache.commons.collections.CollectionUtils;
@@ -94,6 +96,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
private StreamSourceService sourceService;
@Autowired
private StreamSinkService sinkService;
+ @Autowired
+ private ObjectMapper objectMapper;
@Transactional(rollbackFor = Throwable.class)
@@ -720,6 +724,25 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
LOGGER.info("success to logic delete dlq or rlq by groupId={},
topicName={}", groupId, topicName);
}
+ @Override
+ public List<StreamField> parseFields(String fieldsJson) {
+ try {
+ Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
+ new TypeReference<Map<String, String>>() {
+ });
+ return fieldsMap.keySet().stream().map(fieldName -> {
+ StreamField field = new StreamField();
+ field.setFieldName(fieldName);
+ field.setFieldType(fieldsMap.get(fieldName));
+ return field;
+ }).collect(Collectors.toList());
+ } catch (Exception e) {
+ LOGGER.error("parse inlong stream fields error", e);
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ String.format("parse stream fields error : %s",
e.getMessage()));
+ }
+ }
+
/**
* Update field information
* <p/>
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
new file mode 100644
index 000000000..09b36f3b8
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.stream;
+
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InlongStreamTest extends ServiceBaseTest {
+
+ @Autowired
+ protected StreamSinkService streamSinkService;
+
+ @Test
+ public void testParseStreamFields() {
+ String streamFieldsJson =
"{\"name0\":\"string\",\"name1\":\"string\"}";
+ List<StreamField> expectStreamFields = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ StreamField field = new StreamField();
+ field.setFieldName("name" + i);
+ field.setFieldType("string");
+ expectStreamFields.add(field);
+ }
+ StreamField[] expectResult = expectStreamFields.toArray(new
StreamField[0]);
+ List<StreamField> streamFields =
streamService.parseFields(streamFieldsJson);
+ StreamField[] result = streamFields.toArray(new StreamField[0]);
+ Assertions.assertArrayEquals(expectResult, result);
+ }
+
+ @Test
+ public void testParseSinkFields() {
+ String sinkFieldsJson =
"{\"sinkFieldName0\":\"string\",\"sinkFieldName1\":\"string\"}";
+ List<SinkField> expectSinkFields = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ SinkField field = new SinkField();
+ field.setFieldName("sinkFieldName" + i);
+ field.setFieldType("string");
+ expectSinkFields.add(field);
+ }
+ SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]);
+ List<SinkField> sinkFields =
streamSinkService.parseFields(sinkFieldsJson);
+ SinkField[] result = sinkFields.toArray(new SinkField[0]);
+ Assertions.assertArrayEquals(expectResult, result);
+ }
+}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index d83b5f11a..7f9b86f96 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.web.controller;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@@ -29,6 +30,7 @@ import
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.operationlog.OperationLog;
import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
@@ -43,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+
/**
* Inlong stream control layer
*/
@@ -51,6 +55,9 @@ import org.springframework.web.bind.annotation.RestController;
@Api(tags = "Inlong-Stream-API")
public class InlongStreamController {
+ @Autowired
+ private ObjectMapper objectMapper;
+
@Autowired
private InlongStreamService streamService;
@Autowired
@@ -169,4 +176,11 @@ public class InlongStreamController {
return Response.success(streamService.delete(groupId, streamId,
username));
}
+ @RequestMapping(value = "/stream/parseFields", method = RequestMethod.POST)
+ @ApiOperation(value = "Parse inlong stream fields from JSON string")
+ @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class,
required = true)
+ public Response<List<StreamField>> parseFields(@RequestBody String
fieldsJson) {
+ return Response.success(streamService.parseFields(fieldsJson));
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 949e943ba..1b3641285 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -27,6 +27,7 @@ import
org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -42,6 +43,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+
/**
* Stream sink control layer
*/
@@ -115,4 +118,11 @@ public class StreamSinkController {
return Response.success(sinkService.deleteByKey(groupId, streamId,
name, startProcess, username));
}
+ @RequestMapping(value = "/sink/parseFields", method = RequestMethod.POST)
+ @ApiOperation(value = "parse stream sink fields from JSON string")
+ @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class,
required = true)
+ public Response<List<SinkField>> parseFields(@RequestBody String
fieldsJson) {
+ return Response.success(sinkService.parseFields(fieldsJson));
+ }
+
}