This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a763bd5c3 [INLONG-6652][Manager] Support MQTT source (#6687)
a763bd5c3 is described below

commit a763bd5c3edcdae8717e59984f2b0241ad20cdea
Author: Lizhen <[email protected]>
AuthorDate: Fri Dec 2 10:01:49 2022 +0800

    [INLONG-6652][Manager] Support MQTT source (#6687)
---
 .../inlong/manager/common/consts/SourceType.java   |  2 +
 .../manager/pojo/source/mqtt/MqttSource.java       | 72 ++++++++++++++++
 .../manager/pojo/source/mqtt/MqttSourceDTO.java    | 77 +++++++++++++++++
 .../pojo/source/mqtt/MqttSourceRequest.java        | 61 +++++++++++++
 .../service/source/mqtt/MqttSourceOperator.java    | 81 ++++++++++++++++++
 .../service/source/MqttSourceServiceTest.java      | 99 ++++++++++++++++++++++
 6 files changed, 392 insertions(+)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 4d6d3a4c1..ce01cafcc 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -40,6 +40,7 @@ public class SourceType {
     public static final String SQLSERVER = "SQLSERVER";
     public static final String MONGODB = "MONGODB";
     public static final String REDIS = "REDIS";
+    public static final String MQTT = "MQTT";
 
     public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new 
HashMap<String, TaskTypeEnum>() {
 
@@ -57,6 +58,7 @@ public class SourceType {
             put(SQLSERVER, TaskTypeEnum.SQLSERVER);
             put(MONGODB, TaskTypeEnum.MONGODB);
             put(REDIS, TaskTypeEnum.REDIS);
+            put(MQTT, TaskTypeEnum.MQTT);
 
         }
     };
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSource.java
new file mode 100644
index 000000000..8b92911eb
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.mqtt;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Mqtt source info")
+@JsonTypeDefine(value = SourceType.MQTT)
+public class MqttSource extends StreamSource {
+
+    @ApiModelProperty("ServerURI of the Mqtt server")
+    private String serverURI;
+
+    @ApiModelProperty("Username of the Mqtt server")
+    private String username;
+
+    @ApiModelProperty("Password of the Mqtt server")
+    private String password;
+
+    @ApiModelProperty("Topic of the Mqtt server")
+    private String topic;
+
+    @ApiModelProperty("Mqtt qos")
+    private int qos = 1;
+
+    @ApiModelProperty("Client Id")
+    private String clientId;
+
+    @ApiModelProperty("Mqtt version")
+    private String mqttVersion;
+
+    public MqttSource() {
+        this.setSourceType(SourceType.MQTT);
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, MqttSourceRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceDTO.java
new file mode 100644
index 000000000..9079200e0
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceDTO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.mqtt;
+
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttSourceDTO {
+
+    @ApiModelProperty("ServerURI of the Mqtt server")
+    private String serverURI;
+
+    @ApiModelProperty("Username of the Mqtt server")
+    private String username;
+
+    @ApiModelProperty("Password of the Mqtt server")
+    private String password;
+
+    @ApiModelProperty("Topic of the Mqtt server")
+    private String topic;
+
+    @ApiModelProperty("Mqtt qos")
+    private int qos = 1;
+
+    @ApiModelProperty("Client Id")
+    private String clientId;
+
+    @ApiModelProperty("Mqtt version")
+    private String mqttVersion;
+
+    public static MqttSourceDTO getFromRequest(MqttSourceRequest request) {
+        return MqttSourceDTO.builder()
+                .serverURI(request.getServerURI())
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .topic(request.getTopic())
+                .qos(request.getQos())
+                .clientId(request.getClientId())
+                .mqttVersion(request.getMqttVersion())
+                .build();
+    }
+
+    public static MqttSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, MqttSourceDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceRequest.java
new file mode 100644
index 000000000..3a00aedd4
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.mqtt;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "MQTT source request")
+@JsonTypeDefine(value = SourceType.MQTT)
+public class MqttSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("ServerURI of the Mqtt server")
+    private String serverURI;
+
+    @ApiModelProperty("Username of the Mqtt server")
+    private String username;
+
+    @ApiModelProperty("Password of the Mqtt server")
+    private String password;
+
+    @ApiModelProperty("Topic of the Mqtt server")
+    private String topic;
+
+    @ApiModelProperty("Mqtt qos")
+    private int qos = 1;
+
+    @ApiModelProperty("Client Id")
+    private String clientId;
+
+    @ApiModelProperty("Mqtt version")
+    private String mqttVersion;
+
+    public MqttSourceRequest() {
+        this.setSourceType(SourceType.MQTT);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
new file mode 100644
index 000000000..a573c9a85
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSourceDTO;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+public class MqttSourceOperator extends AbstractSourceOperator {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.MQTT.equals(sourceType);
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.MQTT;
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity 
targetEntity) {
+        MqttSourceRequest sourceRequest = (MqttSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            MqttSourceDTO dto = MqttSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+
+    @Override
+    public StreamSource getFromEntity(StreamSourceEntity entity) {
+        MqttSource source = new MqttSource();
+        if (entity == null) {
+            return source;
+        }
+
+        MqttSourceDTO dto = MqttSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, source, true);
+        CommonBeanUtils.copyProperties(dto, source, true);
+
+        List<StreamField> streamFields = super.getSourceFields(entity.getId());
+        source.setFieldList(streamFields);
+        return source;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/MqttSourceServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/MqttSourceServiceTest.java
new file mode 100644
index 000000000..83fca51b7
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/MqttSourceServiceTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSourceRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Mqtt source service test
+ */
+public class MqttSourceServiceTest extends ServiceBaseTest {
+
+    private static final String serverURI = "mqtt://broker.emqx.io:1883";
+    private static final String username = "inlong";
+    private static final String password = "123456";
+    private static final String clientId = "mqttx_8f19ed2a";
+    private static final String mqttVersion = "5.0";
+    private final String sourceName = "stream_source_service_test";
+    @Autowired
+    private StreamSourceService sourceService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save source info.
+     */
+    public Integer saveSource() {
+        streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, 
GLOBAL_OPERATOR);
+
+        MqttSourceRequest sourceInfo = new MqttSourceRequest();
+        sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+        sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
+        sourceInfo.setSourceName(sourceName);
+        sourceInfo.setSourceType(SourceType.MQTT);
+        sourceInfo.setServerURI(serverURI);
+        sourceInfo.setUsername(username);
+        sourceInfo.setPassword(password);
+        sourceInfo.setClientId(clientId);
+        sourceInfo.setMqttVersion(mqttVersion);
+
+        return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveSource();
+        Assertions.assertNotNull(id);
+
+        boolean result = sourceService.delete(id, GLOBAL_OPERATOR);
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer id = this.saveSource();
+        StreamSource source = sourceService.get(id);
+        Assertions.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
+
+        sourceService.delete(id, GLOBAL_OPERATOR);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer id = this.saveSource();
+        StreamSource response = sourceService.get(id);
+        Assertions.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
+
+        MqttSource mqttSource = (MqttSource) response;
+        MqttSourceRequest request = CommonBeanUtils.copyProperties(mqttSource, 
MqttSourceRequest::new);
+        boolean result = sourceService.update(request, GLOBAL_OPERATOR);
+        Assertions.assertTrue(result);
+
+        sourceService.delete(id, GLOBAL_OPERATOR);
+    }
+
+}

Reply via email to