This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0b3fdfc051 [INLONG-8964][Manager] Support Sortstandalone cluster
management (#8978)
0b3fdfc051 is described below
commit 0b3fdfc051c44e3bef7122f16482f1d85807bd1f
Author: vernedeng <[email protected]>
AuthorDate: Mon Sep 25 19:25:20 2023 +0800
[INLONG-8964][Manager] Support Sortstandalone cluster management (#8978)
* [INLONG-8964][Manager] Support SortStandalone cluster management
---
.../inlong/manager/common/enums/ClusterType.java | 2 +
.../sortstandalone/SortStandaloneClusterDTO.java | 65 +++++++++++++++++
.../sortstandalone/SortStandaloneClusterInfo.java | 52 ++++++++++++++
.../SortStandaloneClusterRequest.java | 46 ++++++++++++
.../cluster/SortStandaloneClusterOperator.java | 84 ++++++++++++++++++++++
.../service/cluster/InlongClusterServiceTest.java | 37 ++++++++++
6 files changed, 286 insertions(+)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index 2c057641b2..f6a33ff71e 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -33,6 +33,7 @@ public class ClusterType {
public static final String DATAPROXY = "DATAPROXY";
public static final String KAFKA = "KAFKA";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
+ public static final String SORTSTANDALONE = "SORTSTANDALONE";
private static final Set<String> TYPE_SET = new HashSet<String>() {
@@ -43,6 +44,7 @@ public class ClusterType {
add(ClusterType.DATAPROXY);
add(ClusterType.KAFKA);
add(ClusterType.ELASTICSEARCH);
+ add(ClusterType.SORTSTANDALONE);
}
};
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
new file mode 100644
index 0000000000..0eec2ded29
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterDTO.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Set;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("SortStandalone cluster info")
+public class SortStandaloneClusterDTO {
+
+ @ApiModelProperty(value = "Supported sink types")
+ private Set<String> supportedSinkTypes;
+
+ public static SortStandaloneClusterDTO
getFromRequest(SortStandaloneClusterRequest request, String extParams) {
+ SortStandaloneClusterDTO dto = StringUtils.isNotBlank(extParams)
+ ? SortStandaloneClusterDTO.getFromJson(extParams)
+ : new SortStandaloneClusterDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static SortStandaloneClusterDTO getFromJson(@NotNull String
extParams) {
+ try {
+ return JsonUtils.parseObject(extParams,
SortStandaloneClusterDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ String.format("parse extParams of SortStandalone Cluster
failure: %s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
new file mode 100644
index 0000000000..ea14d2a56b
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Set;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
+@ApiModel("Inlong cluster info for SortStandalone")
+public class SortStandaloneClusterInfo extends ClusterInfo {
+
+ @ApiModelProperty(value = "Supported sink types")
+ private Set<String> supportedSinkTypes;
+
+ public SortStandaloneClusterInfo() {
+ this.setType(ClusterType.SORTSTANDALONE);
+ }
+
+ @Override
+ public ClusterRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this,
SortStandaloneClusterRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
new file mode 100644
index 0000000000..fd8c10e0d0
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sortstandalone/SortStandaloneClusterRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.sortstandalone;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Set;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORTSTANDALONE)
+@ApiModel("Inlong cluster request for SortStandalone")
+public class SortStandaloneClusterRequest extends ClusterRequest {
+
+ @ApiModelProperty(value = "Supported sink types")
+ private Set<String> supportedSinkTypes;
+
+ public SortStandaloneClusterRequest() {
+ this.setType(ClusterType.SORTSTANDALONE);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
new file mode 100644
index 0000000000..9e04f33c0d
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortStandaloneClusterOperator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterDTO;
+import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
+import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class SortStandaloneClusterOperator extends AbstractClusterOperator {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ protected void setTargetEntity(ClusterRequest request, InlongClusterEntity
targetEntity) {
+ SortStandaloneClusterRequest standaloneRequest =
(SortStandaloneClusterRequest) request;
+ CommonBeanUtils.copyProperties(standaloneRequest, targetEntity, true);
+ try {
+ SortStandaloneClusterDTO dto =
SortStandaloneClusterDTO.getFromRequest(standaloneRequest,
+ targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ log.debug("success to set entity for SortStandalone cluster");
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ String.format("serialize extParams of SortStandalone
Cluster failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public Boolean accept(String clusterType) {
+ return getClusterType().equals(clusterType);
+ }
+
+ @Override
+ public String getClusterType() {
+ return ClusterType.SORTSTANDALONE;
+ }
+
+ @Override
+ public ClusterInfo getFromEntity(InlongClusterEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+
+ SortStandaloneClusterInfo clusterInfo = new
SortStandaloneClusterInfo();
+ CommonBeanUtils.copyProperties(entity, clusterInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ SortStandaloneClusterDTO dto =
SortStandaloneClusterDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, clusterInfo);
+ }
+ return clusterInfo;
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 006429e37c..39106fccda 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -32,6 +32,8 @@ import
org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import
org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
+import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterInfo;
+import
org.apache.inlong.manager.pojo.cluster.sortstandalone.SortStandaloneClusterRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -43,7 +45,9 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Inlong cluster service test for {@link InlongClusterService}
@@ -55,6 +59,15 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
@Autowired
private HeartbeatManager heartbeatManager;
+ public Integer saveStandaloneCluster(String clusterTag, String
clusterName, Set<String> supportedSinkTypes) {
+ SortStandaloneClusterRequest request = new
SortStandaloneClusterRequest();
+ request.setClusterTags(clusterTag);
+ request.setName(clusterName);
+ request.setSupportedSinkTypes(supportedSinkTypes);
+ request.setInCharges(GLOBAL_OPERATOR);
+ return clusterService.save(request, GLOBAL_OPERATOR);
+ }
+
/**
* Save data proxy cluster
*/
@@ -312,6 +325,30 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
}
+ @Test
+ public void testStandaloneCluster() {
+ String clusterTag = "standalone_cluster";
+ String clusterName = "test_standalone";
+ String type1 = "type1";
+ String type2 = "type2";
+ String type3 = "type3";
+ Set<String> supportedType = new HashSet<>();
+ supportedType.add(type1);
+ supportedType.add(type2);
+ supportedType.add(type3);
+
+ Integer id = this.saveStandaloneCluster(clusterTag, clusterName,
supportedType);
+ Assertions.assertNotNull(id);
+
+ ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR);
+ Assertions.assertInstanceOf(SortStandaloneClusterInfo.class, info);
+
+ Set<String> types = ((SortStandaloneClusterInfo)
info).getSupportedSinkTypes();
+ Assertions.assertTrue(types.contains(type1));
+ Assertions.assertTrue(types.contains(type2));
+ Assertions.assertTrue(types.contains(type3));
+ }
+
@Test
public void testDataProxyCluster() {
String clusterTag = "default_cluster";