This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a763bd5c3 [INLONG-6652][Manager] Support MQTT source (#6687)
a763bd5c3 is described below
commit a763bd5c3edcdae8717e59984f2b0241ad20cdea
Author: Lizhen <[email protected]>
AuthorDate: Fri Dec 2 10:01:49 2022 +0800
[INLONG-6652][Manager] Support MQTT source (#6687)
---
.../inlong/manager/common/consts/SourceType.java | 2 +
.../manager/pojo/source/mqtt/MqttSource.java | 72 ++++++++++++++++
.../manager/pojo/source/mqtt/MqttSourceDTO.java | 77 +++++++++++++++++
.../pojo/source/mqtt/MqttSourceRequest.java | 61 +++++++++++++
.../service/source/mqtt/MqttSourceOperator.java | 81 ++++++++++++++++++
.../service/source/MqttSourceServiceTest.java | 99 ++++++++++++++++++++++
6 files changed, 392 insertions(+)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 4d6d3a4c1..ce01cafcc 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -40,6 +40,7 @@ public class SourceType {
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String REDIS = "REDIS";
+ public static final String MQTT = "MQTT";
public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new
HashMap<String, TaskTypeEnum>() {
@@ -57,6 +58,7 @@ public class SourceType {
put(SQLSERVER, TaskTypeEnum.SQLSERVER);
put(MONGODB, TaskTypeEnum.MONGODB);
put(REDIS, TaskTypeEnum.REDIS);
+ put(MQTT, TaskTypeEnum.MQTT);
}
};
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSource.java
new file mode 100644
index 000000000..8b92911eb
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.mqtt;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Mqtt source info")
+@JsonTypeDefine(value = SourceType.MQTT)
+public class MqttSource extends StreamSource {
+
+ @ApiModelProperty("ServerURI of the Mqtt server")
+ private String serverURI;
+
+ @ApiModelProperty("Username of the Mqtt server")
+ private String username;
+
+ @ApiModelProperty("Password of the Mqtt server")
+ private String password;
+
+ @ApiModelProperty("Topic of the Mqtt server")
+ private String topic;
+
+ @ApiModelProperty("Mqtt qos")
+ private int qos = 1;
+
+ @ApiModelProperty("Client Id")
+ private String clientId;
+
+ @ApiModelProperty("Mqtt version")
+ private String mqttVersion;
+
+ public MqttSource() {
+ this.setSourceType(SourceType.MQTT);
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this, MqttSourceRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceDTO.java
new file mode 100644
index 000000000..9079200e0
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceDTO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.mqtt;
+
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttSourceDTO {
+
+ @ApiModelProperty("ServerURI of the Mqtt server")
+ private String serverURI;
+
+ @ApiModelProperty("Username of the Mqtt server")
+ private String username;
+
+ @ApiModelProperty("Password of the Mqtt server")
+ private String password;
+
+ @ApiModelProperty("Topic of the Mqtt server")
+ private String topic;
+
+ @ApiModelProperty("Mqtt qos")
+ private int qos = 1;
+
+ @ApiModelProperty("Client Id")
+ private String clientId;
+
+ @ApiModelProperty("Mqtt version")
+ private String mqttVersion;
+
+ public static MqttSourceDTO getFromRequest(MqttSourceRequest request) {
+ return MqttSourceDTO.builder()
+ .serverURI(request.getServerURI())
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .topic(request.getTopic())
+ .qos(request.getQos())
+ .clientId(request.getClientId())
+ .mqttVersion(request.getMqttVersion())
+ .build();
+ }
+
+ public static MqttSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, MqttSourceDTO.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceRequest.java
new file mode 100644
index 000000000..3a00aedd4
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mqtt/MqttSourceRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.mqtt;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "MQTT source request")
+@JsonTypeDefine(value = SourceType.MQTT)
+public class MqttSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("ServerURI of the Mqtt server")
+ private String serverURI;
+
+ @ApiModelProperty("Username of the Mqtt server")
+ private String username;
+
+ @ApiModelProperty("Password of the Mqtt server")
+ private String password;
+
+ @ApiModelProperty("Topic of the Mqtt server")
+ private String topic;
+
+ @ApiModelProperty("Mqtt qos")
+ private int qos = 1;
+
+ @ApiModelProperty("Client Id")
+ private String clientId;
+
+ @ApiModelProperty("Mqtt version")
+ private String mqttVersion;
+
+ public MqttSourceRequest() {
+ this.setSourceType(SourceType.MQTT);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
new file mode 100644
index 000000000..a573c9a85
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSourceDTO;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+public class MqttSourceOperator extends AbstractSourceOperator {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.MQTT.equals(sourceType);
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.MQTT;
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity
targetEntity) {
+ MqttSourceRequest sourceRequest = (MqttSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ MqttSourceDTO dto = MqttSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+
+ @Override
+ public StreamSource getFromEntity(StreamSourceEntity entity) {
+ MqttSource source = new MqttSource();
+ if (entity == null) {
+ return source;
+ }
+
+ MqttSourceDTO dto = MqttSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, source, true);
+ CommonBeanUtils.copyProperties(dto, source, true);
+
+ List<StreamField> streamFields = super.getSourceFields(entity.getId());
+ source.setFieldList(streamFields);
+ return source;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/MqttSourceServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/MqttSourceServiceTest.java
new file mode 100644
index 000000000..83fca51b7
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/MqttSourceServiceTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSource;
+import org.apache.inlong.manager.pojo.source.mqtt.MqttSourceRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Mqtt source service test
+ */
+public class MqttSourceServiceTest extends ServiceBaseTest {
+
+ private static final String serverURI = "mqtt://broker.emqx.io:1883";
+ private static final String username = "inlong";
+ private static final String password = "123456";
+ private static final String clientId = "mqttx_8f19ed2a";
+ private static final String mqttVersion = "5.0";
+ private final String sourceName = "stream_source_service_test";
+ @Autowired
+ private StreamSourceService sourceService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+
+ /**
+ * Save source info.
+ */
+ public Integer saveSource() {
+ streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID,
GLOBAL_OPERATOR);
+
+ MqttSourceRequest sourceInfo = new MqttSourceRequest();
+ sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+ sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
+ sourceInfo.setSourceName(sourceName);
+ sourceInfo.setSourceType(SourceType.MQTT);
+ sourceInfo.setServerURI(serverURI);
+ sourceInfo.setUsername(username);
+ sourceInfo.setPassword(password);
+ sourceInfo.setClientId(clientId);
+ sourceInfo.setMqttVersion(mqttVersion);
+
+ return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
+ }
+
+ @Test
+ public void testSaveAndDelete() {
+ Integer id = this.saveSource();
+ Assertions.assertNotNull(id);
+
+ boolean result = sourceService.delete(id, GLOBAL_OPERATOR);
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer id = this.saveSource();
+ StreamSource source = sourceService.get(id);
+ Assertions.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
+
+ sourceService.delete(id, GLOBAL_OPERATOR);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer id = this.saveSource();
+ StreamSource response = sourceService.get(id);
+ Assertions.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
+
+ MqttSource mqttSource = (MqttSource) response;
+ MqttSourceRequest request = CommonBeanUtils.copyProperties(mqttSource,
MqttSourceRequest::new);
+ boolean result = sourceService.update(request, GLOBAL_OPERATOR);
+ Assertions.assertTrue(result);
+
+ sourceService.delete(id, GLOBAL_OPERATOR);
+ }
+
+}