This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cb03b1b606 [INLONG-8823][Manager] Supporting data flow to Pulsar 
(#8824)
cb03b1b606 is described below

commit cb03b1b606809a4022593e099342984df1dfa6f2
Author: castor <[email protected]>
AuthorDate: Thu Sep 7 12:30:58 2023 +0800

    [INLONG-8823][Manager] Supporting data flow to Pulsar (#8824)
    
    Co-authored-by: castorqin <[email protected]>
---
 .../inlong/manager/common/consts/DataNodeType.java |   1 +
 .../inlong/manager/common/consts/SourceType.java   |   1 -
 .../inlong/manager/common/consts/StreamType.java   |   2 +-
 .../pojo/node/pulsar/PulsarDataNodeDTO.java        |  75 ++++++++++++
 .../pojo/node/pulsar/PulsarDataNodeInfo.java       |  55 +++++++++
 .../pojo/node/pulsar/PulsarDataNodeRequest.java    |  49 ++++++++
 .../manager/pojo/sink/pulsar/PulsarSink.java       |  75 ++++++++++++
 .../manager/pojo/sink/pulsar/PulsarSinkDTO.java    |  72 ++++++++++++
 .../pojo/sink/pulsar/PulsarSinkRequest.java        |  51 +++++++++
 .../node/pulsar/PulsarDataNodeOperator.java        | 120 +++++++++++++++++++
 .../service/resource/queue/pulsar/PulsarUtils.java |   2 +-
 .../sink/pulsar/PulsarResourceOperator.java        | 121 ++++++++++++++++++++
 .../manager/service/sink/AbstractSinkOperator.java |  10 ++
 .../service/sink/pulsar/PulsarSinkOperator.java    | 127 +++++++++++++++++++++
 14 files changed, 758 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 09bb35344a..6e1002612c 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -36,4 +36,5 @@ public class DataNodeType {
     public static final String ORACLE = "ORACLE";
     public static final String SQLSERVER = "SQLSERVER";
     public static final String MONGODB = "MONGODB";
+    public static final String PULSAR = "PULSAR";
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index e01989501a..f40593e421 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -29,7 +29,6 @@ public class SourceType extends StreamType {
 
     public static final String AUTO_PUSH = "AUTO_PUSH";
     public static final String TUBEMQ = "TUBEMQ";
-    public static final String PULSAR = "PULSAR";
 
     public static final String FILE = "FILE";
     public static final String MYSQL_SQL = "MYSQL_SQL";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index 26685814de..77ba1e3ab5 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -27,5 +27,5 @@ public class StreamType {
     public static final String POSTGRESQL = "POSTGRESQL";
     public static final String SQLSERVER = "SQLSERVER";
     public static final String ORACLE = "ORACLE";
-
+    public static final String PULSAR = "PULSAR";
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeDTO.java
new file mode 100644
index 0000000000..351c29ab11
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeDTO.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.pulsar;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Pulsar data node info")
+public class PulsarDataNodeDTO {
+
+    @ApiModelProperty("Pulsar service url")
+    private String serviceUrl;
+
+    @ApiModelProperty("Pulsar admin url")
+    private String adminUrl;
+
+    @ApiModelProperty(value = "Pulsar token")
+    private String token;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static PulsarDataNodeDTO getFromRequest(PulsarDataNodeRequest 
request, String extParams) {
+        PulsarDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+                ? PulsarDataNodeDTO.getFromJson(extParams)
+                : new PulsarDataNodeDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static PulsarDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, PulsarDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for pulsar node: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeInfo.java
new file mode 100644
index 0000000000..8626a0cd55
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Pulsar data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.PULSAR)
+@ApiModel("Pulsar data node info")
+public class PulsarDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Pulsar service url")
+    private String serviceUrl;
+
+    @ApiModelProperty("Pulsar admin url")
+    private String adminUrl;
+
+    public PulsarDataNodeInfo() {
+        this.setType(DataNodeType.PULSAR);
+    }
+
+    @Override
+    public PulsarDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
PulsarDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeRequest.java
new file mode 100644
index 0000000000..0693f03aa2
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Pulsar data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.PULSAR)
+@ApiModel("Pulsar data node request")
+public class PulsarDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("Pulsar service url")
+    private String serviceUrl;
+
+    @ApiModelProperty("Pulsar admin url")
+    private String adminUrl;
+
+    public PulsarDataNodeRequest() {
+        this.setType(DataNodeType.PULSAR);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
new file mode 100644
index 0000000000..a29cd33460
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Pulsar sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Pulsar sink info")
+@JsonTypeDefine(value = SinkType.PULSAR)
+public class PulsarSink extends StreamSink {
+
+    @ApiModelProperty("Pulsar service url")
+    private String serviceUrl;
+
+    @ApiModelProperty("Pulsar tenant")
+    private String pulsarTenant;
+
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
+
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
+
+    @ApiModelProperty("Pulsar token")
+    private String token;
+
+    @ApiModelProperty("Admin url")
+    private String adminUrl;
+
+    @ApiModelProperty("Pulsar partition number")
+    private Integer partitionNum;
+
+    public PulsarSink() {
+        this.setSinkType(SinkType.PULSAR);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, PulsarSinkRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
new file mode 100644
index 0000000000..1d80e91c8e
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarSinkDTO {
+
+    @ApiModelProperty("Pulsar tenant")
+    private String pulsarTenant;
+
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
+
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
+
+    @ApiModelProperty("Pulsar partition number")
+    private Integer partitionNum;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static PulsarSinkDTO getFromRequest(PulsarSinkRequest request, 
String extParams) {
+        PulsarSinkDTO dto =
+                StringUtils.isNotBlank(extParams) ? 
PulsarSinkDTO.getFromJson(extParams) : new PulsarSinkDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    public static PulsarSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, PulsarSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of pulsar SinkDTO failure: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkRequest.java
new file mode 100644
index 0000000000..ea8c301fcf
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Pulsar sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Pulsar sink request")
+@JsonTypeDefine(value = SinkType.PULSAR)
+public class PulsarSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Pulsar tenant")
+    private String pulsarTenant;
+
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
+
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
+
+    @ApiModelProperty("Pulsar partition number")
+    private Integer partitionNum;
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
new file mode 100644
index 0000000000..819df3df34
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Pulsar data node operator
+ */
+@Service
+public class PulsarDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.PULSAR;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        PulsarDataNodeInfo pulsarDataNodeInfo = new PulsarDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, pulsarDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            PulsarDataNodeDTO dto = 
PulsarDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, pulsarDataNodeInfo);
+        }
+        return pulsarDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        PulsarDataNodeRequest nodeRequest = (PulsarDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(nodeRequest, targetEntity, true);
+        try {
+            PulsarDataNodeDTO dto = 
PulsarDataNodeDTO.getFromRequest(nodeRequest, targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("Failed to build extParams for pulsar node: 
%s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        PulsarDataNodeRequest pulsarDataNodeRequest = (PulsarDataNodeRequest) 
request;
+        String adminUrl = pulsarDataNodeRequest.getAdminUrl();
+        String token = pulsarDataNodeRequest.getToken();
+        Preconditions.expectNotBlank(adminUrl, 
ErrorCodeEnum.INVALID_PARAMETER, "connection admin urlcannot be empty");
+        if (getPulsarConnection(adminUrl, token)) {
+            LOGGER.info("pulsar  connection success for adminUrl={}, token={}",
+                    adminUrl, token);
+        }
+        return true;
+    }
+
+    private boolean getPulsarConnection(String adminUrl, String token) {
+
+        PulsarClusterInfo pulsarClusterInfo = 
PulsarClusterInfo.builder().adminUrl(adminUrl)
+                .token(token).build();
+        try (PulsarAdmin pulsarAdmin = 
PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+            // test connect for pulsar adminUrl
+            pulsarAdmin.tenants().getTenants();
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("Pulsar connection failed for 
AdminUrl=%s", pulsarClusterInfo.getAdminUrl());
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index dacebf7314..81c3de2b6a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -70,7 +70,7 @@ public class PulsarUtils {
      *
      * @apiNote It must be closed after use.
      */
-    private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String 
token) throws PulsarClientException {
+    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String 
token) throws PulsarClientException {
         return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
                 .authentication(AuthenticationFactory.token(token)).build();
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
new file mode 100644
index 0000000000..8f5935f8e4
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Pulsar resource operate for creating pulsar resource
+ */
+@Service
+public class PulsarResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarResourceOperator.class);
+
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+    @Autowired
+    private PulsarOperator pulsarOperator;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.PULSAR.equals(sinkType);
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
+        if 
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOG.warn("sink resource [" + sinkInfo.getId() + "] already 
success, skip to create");
+            return;
+        } else if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
 {
+            LOG.warn("create resource was disabled, skip to create for [" + 
sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTopic(sinkInfo);
+    }
+
+    private void createTopic(SinkInfo sinkInfo) {
+        PulsarSinkDTO pulsarSinkDTO = 
PulsarSinkDTO.getFromJson(sinkInfo.getExtParams());
+        PulsarDataNodeDTO pulsarDataNodeInfo = getPulsarDataNodeInfo(sinkInfo);
+        try {
+
+            PulsarClusterInfo pulsarClusterInfo = 
PulsarClusterInfo.builder().adminUrl(pulsarDataNodeInfo.getAdminUrl())
+                    .token(pulsarDataNodeInfo.getToken()).build();
+            PulsarAdmin pulsarAdmin = 
PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
+            // create pulsar tenant
+            pulsarOperator.createTenant(pulsarAdmin, 
pulsarSinkDTO.getPulsarTenant());
+            // use default config to create namespace
+            InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
+            pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, 
pulsarSinkDTO.getPulsarTenant(),
+                    pulsarSinkDTO.getNamespace());
+            String queueModel = pulsarSinkDTO.getPartitionNum() > 0 ? 
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL
+                    : InlongConstants.PULSAR_QUEUE_TYPE_SERIAL;
+            PulsarTopicInfo topicInfo = 
PulsarTopicInfo.builder().pulsarTenant(pulsarSinkDTO.getPulsarTenant())
+                    .namespace(pulsarSinkDTO.getNamespace())
+                    .topicName(pulsarSinkDTO.getTopic())
+                    .numPartitions(pulsarSinkDTO.getPartitionNum())
+                    .queueModule(queueModel)
+                    .build();
+            // create topic
+            pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+        } catch (PulsarClientException | PulsarAdminException e) {
+            LOG.error("init pulsar admin error", e);
+            throw new BusinessException();
+        }
+
+    }
+
+    private PulsarDataNodeDTO getPulsarDataNodeInfo(SinkInfo sinkInfo) {
+
+        String dataNodeName = sinkInfo.getDataNodeName();
+        Preconditions.expectNotBlank(dataNodeName, 
ErrorCodeEnum.INVALID_PARAMETER,
+                "Pulsar admin url not specified and data node is empty");
+        PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                dataNodeName, sinkInfo.getSinkType());
+        PulsarDataNodeDTO pulsarDataNodeDTO = new PulsarDataNodeDTO();
+        CommonBeanUtils.copyProperties(dataNodeInfo, pulsarDataNodeDTO, true);
+        return pulsarDataNodeDTO;
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 62af32425c..789065951f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -23,8 +23,10 @@ import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -54,6 +56,8 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
 
     protected static final String KEY_GROUP_ID = "inlongGroupId";
     protected static final String KEY_STREAM_ID = "inlongStreamId";
+    protected static final String KEY_DATA_TYPE = "dataType";
+    protected static final String KEY_SEPARATOR = "separator";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSinkOperator.class);
     @Autowired
     protected StreamSinkEntityMapper sinkMapper;
@@ -61,6 +65,8 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
     protected StreamSinkFieldEntityMapper sinkFieldMapper;
     @Autowired
     protected DataNodeOperateHelper dataNodeHelper;
+    @Autowired
+    protected InlongStreamEntityMapper inlongStreamEntityMapper;
 
     /**
      * Setting the parameters of the latest entity.
@@ -222,10 +228,14 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
     @Override
     public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, 
List<String> fields) {
         Map<String, String> param;
+        InlongStreamEntity inlongStreamEntity = 
inlongStreamEntityMapper.selectByIdentifier(
+                streamSink.getInlongGroupId(), streamSink.getInlongStreamId());
         try {
             param = JsonUtils.parseObject(streamSink.getExtParams(), 
HashMap.class);
             // put group and stream info
             assert param != null;
+            param.put(KEY_SEPARATOR, inlongStreamEntity.getDataSeparator());
+            param.put(KEY_DATA_TYPE, inlongStreamEntity.getDataType());
             param.put(KEY_GROUP_ID, streamSink.getInlongGroupId());
             param.put(KEY_STREAM_ID, streamSink.getInlongStreamId());
             return param;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
new file mode 100644
index 0000000000..9452d6a73e
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pulsar sink operator
+ */
+@Service
+public class PulsarSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarSinkOperator.class);
+
+    private static final String TOPIC = "topic";
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private DataNodeEntityMapper dataNodeEntityMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.PULSAR.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.PULSAR;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
+        PulsarSinkRequest sinkRequest = (PulsarSinkRequest) request;
+        try {
+            PulsarSinkDTO dto = PulsarSinkDTO.getFromRequest(sinkRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Pulsar SinkDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        PulsarSink sink = new PulsarSink();
+        if (entity == null) {
+            return sink;
+        }
+        DataNodeEntity dataNodeEntity = 
dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(),
+                DataNodeType.PULSAR);
+        PulsarDataNodeDTO pulsarDataNodeDTO = 
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+                PulsarDataNodeDTO.class);
+        PulsarSinkDTO dto = PulsarSinkDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        CommonBeanUtils.copyProperties(pulsarDataNodeDTO, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, 
List<String> fields) {
+
+        Map<String, String> params = super.parse2IdParams(streamSink, fields);
+        PulsarSinkDTO pulsarSinkDTO;
+        try {
+            pulsarSinkDTO = objectMapper.readValue(streamSink.getExtParams(), 
PulsarSinkDTO.class);
+        } catch (JsonProcessingException e) {
+            LOGGER.error("parse pulsar sink dto error", e);
+            return params;
+        }
+        String fullTopicName = getFullTopicName(pulsarSinkDTO);
+        params.put(TOPIC, fullTopicName);
+        return params;
+    }
+
+    private String getFullTopicName(PulsarSinkDTO pulsarSinkDTO) {
+        return pulsarSinkDTO.getPulsarTenant() + "/" + 
pulsarSinkDTO.getNamespace() + "/" + pulsarSinkDTO.getTopic();
+    }
+
+}


Reply via email to