This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c688acb7 [INLONG-7004][Manager] Support to creating schema of 
StreamSource by JSON (#7424)
6c688acb7 is described below

commit 6c688acb797933f4d377853db8e184c1731f9ca9
Author: fuweng11 <[email protected]>
AuthorDate: Mon Feb 27 11:08:43 2023 +0800

    [INLONG-7004][Manager] Support to creating schema of StreamSource by JSON 
(#7424)
---
 .../manager/service/sink/StreamSinkService.java    | 13 ++++-
 .../service/sink/StreamSinkServiceImpl.java        | 23 ++++++++
 .../service/stream/InlongStreamService.java        | 10 +++-
 .../service/stream/InlongStreamServiceImpl.java    | 23 ++++++++
 .../manager/service/stream/InlongStreamTest.java   | 67 ++++++++++++++++++++++
 .../web/controller/InlongStreamController.java     | 14 +++++
 .../web/controller/StreamSinkController.java       | 10 ++++
 7 files changed, 156 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 4f798806e..caa99873b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -22,17 +22,17 @@ import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 /**
  * Service layer interface for stream sink
  */
@@ -123,7 +123,7 @@ public interface StreamSinkService {
      * Paging query stream sink info based on conditions.
      *
      * @param request paging request
-     * @param opInfo  userinfo of operator
+     * @param opInfo userinfo of operator
      * @return sink page list
      */
     List<? extends StreamSink> listByCondition(SinkPageRequest request, 
UserInfo opInfo);
@@ -245,4 +245,11 @@ public interface StreamSinkService {
      */
     Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String 
operator);
 
+    /**
+     * Converts a json string to a sinkFields
+     *
+     * @param fieldsJson JSON string for the field information
+     * @return list of sink field
+     */
+    List<SinkField> parseFields(String fieldsJson);
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index bdd680084..d90ca8766 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.sink;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.google.common.collect.Lists;
@@ -90,6 +92,8 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     private StreamSinkFieldEntityMapper sinkFieldMapper;
     @Autowired
     private AutowireCapableBeanFactory autowireCapableBeanFactory;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     // To avoid circular dependencies, you cannot use @Autowired, it will be 
injected by AutowireCapableBeanFactory
     private InlongStreamProcessService streamProcessOperation;
@@ -683,6 +687,25 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return true;
     }
 
+    @Override
+    public List<SinkField> parseFields(String fieldsJson) {
+        try {
+            Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
+                    new TypeReference<Map<String, String>>() {
+                    });
+            return fieldsMap.keySet().stream().map(fieldName -> {
+                SinkField field = new SinkField();
+                field.setFieldName(fieldName);
+                field.setFieldType(fieldsMap.get(fieldName));
+                return field;
+            }).collect(Collectors.toList());
+        } catch (Exception e) {
+            LOGGER.error("parse sink fields error", e);
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    String.format("parse sink fields error : %s", 
e.getMessage()));
+        }
+    }
+
     private void checkSinkRequestParams(SinkRequest request) {
         // check request parameter
         // check group id
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 01c23a880..51b15be14 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 
 import java.util.List;
@@ -101,7 +102,7 @@ public interface InlongStreamService {
      * Query inlong stream brief info list
      *
      * @param request query request
-     * @param opInfo  userinfo of operator
+     * @param opInfo userinfo of operator
      * @return inlong stream brief list
      */
     List<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest request, 
UserInfo opInfo);
@@ -230,4 +231,11 @@ public interface InlongStreamService {
      */
     void logicDeleteDlqOrRlq(String bid, String topicName, String operator);
 
+    /**
+     * Converts a json string to a streamFields
+     *
+     * @param fieldsJson JSON string for the field information
+     * @return list of stream field
+     */
+    List<StreamField> parseFields(String fieldsJson);
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 9418e3c3e..50a92486f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.stream;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import org.apache.commons.collections.CollectionUtils;
@@ -94,6 +96,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
     private StreamSourceService sourceService;
     @Autowired
     private StreamSinkService sinkService;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     @Transactional(rollbackFor = Throwable.class)
 
@@ -720,6 +724,25 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         LOGGER.info("success to logic delete dlq or rlq by groupId={}, 
topicName={}", groupId, topicName);
     }
 
+    @Override
+    public List<StreamField> parseFields(String fieldsJson) {
+        try {
+            Map<String, String> fieldsMap = objectMapper.readValue(fieldsJson,
+                    new TypeReference<Map<String, String>>() {
+                    });
+            return fieldsMap.keySet().stream().map(fieldName -> {
+                StreamField field = new StreamField();
+                field.setFieldName(fieldName);
+                field.setFieldType(fieldsMap.get(fieldName));
+                return field;
+            }).collect(Collectors.toList());
+        } catch (Exception e) {
+            LOGGER.error("parse inlong stream fields error", e);
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    String.format("parse stream fields error : %s", 
e.getMessage()));
+        }
+    }
+
     /**
      * Update field information
      * <p/>
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
new file mode 100644
index 000000000..09b36f3b8
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.stream;
+
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InlongStreamTest extends ServiceBaseTest {
+
+    @Autowired
+    protected StreamSinkService streamSinkService;
+
+    @Test
+    public void testParseStreamFields() {
+        String streamFieldsJson = 
"{\"name0\":\"string\",\"name1\":\"string\"}";
+        List<StreamField> expectStreamFields = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            StreamField field = new StreamField();
+            field.setFieldName("name" + i);
+            field.setFieldType("string");
+            expectStreamFields.add(field);
+        }
+        StreamField[] expectResult = expectStreamFields.toArray(new 
StreamField[0]);
+        List<StreamField> streamFields = 
streamService.parseFields(streamFieldsJson);
+        StreamField[] result = streamFields.toArray(new StreamField[0]);
+        Assertions.assertArrayEquals(expectResult, result);
+    }
+
+    @Test
+    public void testParseSinkFields() {
+        String sinkFieldsJson = 
"{\"sinkFieldName0\":\"string\",\"sinkFieldName1\":\"string\"}";
+        List<SinkField> expectSinkFields = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            SinkField field = new SinkField();
+            field.setFieldName("sinkFieldName" + i);
+            field.setFieldType("string");
+            expectSinkFields.add(field);
+        }
+        SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]);
+        List<SinkField> sinkFields = 
streamSinkService.parseFields(sinkFieldsJson);
+        SinkField[] result = sinkFields.toArray(new SinkField[0]);
+        Assertions.assertArrayEquals(expectResult, result);
+    }
+}
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index d83b5f11a..7f9b86f96 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.web.controller;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
@@ -29,6 +30,7 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
@@ -43,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+
 /**
  * Inlong stream control layer
  */
@@ -51,6 +55,9 @@ import org.springframework.web.bind.annotation.RestController;
 @Api(tags = "Inlong-Stream-API")
 public class InlongStreamController {
 
+    @Autowired
+    private ObjectMapper objectMapper;
+
     @Autowired
     private InlongStreamService streamService;
     @Autowired
@@ -169,4 +176,11 @@ public class InlongStreamController {
         return Response.success(streamService.delete(groupId, streamId, 
username));
     }
 
+    @RequestMapping(value = "/stream/parseFields", method = RequestMethod.POST)
+    @ApiOperation(value = "Parse inlong stream fields from JSON string")
+    @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class, 
required = true)
+    public Response<List<StreamField>> parseFields(@RequestBody String 
fieldsJson) {
+        return Response.success(streamService.parseFields(fieldsJson));
+    }
+
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 949e943ba..1b3641285 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -27,6 +27,7 @@ import 
org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -42,6 +43,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+
 /**
  * Stream sink control layer
  */
@@ -115,4 +118,11 @@ public class StreamSinkController {
         return Response.success(sinkService.deleteByKey(groupId, streamId, 
name, startProcess, username));
     }
 
+    @RequestMapping(value = "/sink/parseFields", method = RequestMethod.POST)
+    @ApiOperation(value = "parse stream sink fields from JSON string")
+    @ApiImplicitParam(name = "fieldsJson", dataTypeClass = String.class, 
required = true)
+    public Response<List<SinkField>> parseFields(@RequestBody String 
fieldsJson) {
+        return Response.success(sinkService.parseFields(fieldsJson));
+    }
+
 }

Reply via email to