This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cb03b1b606 [INLONG-8823][Manager] Supporting data flow to Pulsar
(#8824)
cb03b1b606 is described below
commit cb03b1b606809a4022593e099342984df1dfa6f2
Author: castor <[email protected]>
AuthorDate: Thu Sep 7 12:30:58 2023 +0800
[INLONG-8823][Manager] Supporting data flow to Pulsar (#8824)
Co-authored-by: castorqin <[email protected]>
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SourceType.java | 1 -
.../inlong/manager/common/consts/StreamType.java | 2 +-
.../pojo/node/pulsar/PulsarDataNodeDTO.java | 75 ++++++++++++
.../pojo/node/pulsar/PulsarDataNodeInfo.java | 55 +++++++++
.../pojo/node/pulsar/PulsarDataNodeRequest.java | 49 ++++++++
.../manager/pojo/sink/pulsar/PulsarSink.java | 75 ++++++++++++
.../manager/pojo/sink/pulsar/PulsarSinkDTO.java | 72 ++++++++++++
.../pojo/sink/pulsar/PulsarSinkRequest.java | 51 +++++++++
.../node/pulsar/PulsarDataNodeOperator.java | 120 +++++++++++++++++++
.../service/resource/queue/pulsar/PulsarUtils.java | 2 +-
.../sink/pulsar/PulsarResourceOperator.java | 121 ++++++++++++++++++++
.../manager/service/sink/AbstractSinkOperator.java | 10 ++
.../service/sink/pulsar/PulsarSinkOperator.java | 127 +++++++++++++++++++++
14 files changed, 758 insertions(+), 3 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 09bb35344a..6e1002612c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -36,4 +36,5 @@ public class DataNodeType {
public static final String ORACLE = "ORACLE";
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
+ public static final String PULSAR = "PULSAR";
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index e01989501a..f40593e421 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -29,7 +29,6 @@ public class SourceType extends StreamType {
public static final String AUTO_PUSH = "AUTO_PUSH";
public static final String TUBEMQ = "TUBEMQ";
- public static final String PULSAR = "PULSAR";
public static final String FILE = "FILE";
public static final String MYSQL_SQL = "MYSQL_SQL";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index 26685814de..77ba1e3ab5 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -27,5 +27,5 @@ public class StreamType {
public static final String POSTGRESQL = "POSTGRESQL";
public static final String SQLSERVER = "SQLSERVER";
public static final String ORACLE = "ORACLE";
-
+ public static final String PULSAR = "PULSAR";
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeDTO.java
new file mode 100644
index 0000000000..351c29ab11
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeDTO.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.pulsar;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Pulsar data node info")
+public class PulsarDataNodeDTO {
+
+ @ApiModelProperty("Pulsar service url")
+ private String serviceUrl;
+
+ @ApiModelProperty("Pulsar admin url")
+ private String adminUrl;
+
+ @ApiModelProperty(value = "Pulsar token")
+ private String token;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static PulsarDataNodeDTO getFromRequest(PulsarDataNodeRequest
request, String extParams) {
+ PulsarDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? PulsarDataNodeDTO.getFromJson(extParams)
+ : new PulsarDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static PulsarDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, PulsarDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+ String.format("Failed to parse extParams for pulsar node:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeInfo.java
new file mode 100644
index 0000000000..8626a0cd55
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Pulsar data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.PULSAR)
+@ApiModel("Pulsar data node info")
+public class PulsarDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("Pulsar service url")
+ private String serviceUrl;
+
+ @ApiModelProperty("Pulsar admin url")
+ private String adminUrl;
+
+ public PulsarDataNodeInfo() {
+ this.setType(DataNodeType.PULSAR);
+ }
+
+ @Override
+ public PulsarDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this,
PulsarDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeRequest.java
new file mode 100644
index 0000000000..0693f03aa2
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/pulsar/PulsarDataNodeRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Pulsar data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.PULSAR)
+@ApiModel("Pulsar data node request")
+public class PulsarDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("Pulsar service url")
+ private String serviceUrl;
+
+ @ApiModelProperty("Pulsar admin url")
+ private String adminUrl;
+
+ public PulsarDataNodeRequest() {
+ this.setType(DataNodeType.PULSAR);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
new file mode 100644
index 0000000000..a29cd33460
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSink.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Pulsar sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Pulsar sink info")
+@JsonTypeDefine(value = SinkType.PULSAR)
+public class PulsarSink extends StreamSink {
+
+ @ApiModelProperty("Pulsar service url")
+ private String serviceUrl;
+
+ @ApiModelProperty("Pulsar tenant")
+ private String pulsarTenant;
+
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
+
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
+
+ @ApiModelProperty("Pulsar token")
+ private String token;
+
+ @ApiModelProperty("Admin url")
+ private String adminUrl;
+
+ @ApiModelProperty("Pulsar partition number")
+ private Integer partitionNum;
+
+ public PulsarSink() {
+ this.setSinkType(SinkType.PULSAR);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, PulsarSinkRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
new file mode 100644
index 0000000000..1d80e91c8e
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkDTO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarSinkDTO {
+
+ @ApiModelProperty("Pulsar tenant")
+ private String pulsarTenant;
+
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
+
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
+
+ @ApiModelProperty("Pulsar partition number")
+ private Integer partitionNum;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static PulsarSinkDTO getFromRequest(PulsarSinkRequest request,
String extParams) {
+ PulsarSinkDTO dto =
+ StringUtils.isNotBlank(extParams) ?
PulsarSinkDTO.getFromJson(extParams) : new PulsarSinkDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ public static PulsarSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, PulsarSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of pulsar SinkDTO failure:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkRequest.java
new file mode 100644
index 0000000000..ea8c301fcf
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/pulsar/PulsarSinkRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Pulsar sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Pulsar sink request")
+@JsonTypeDefine(value = SinkType.PULSAR)
+public class PulsarSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("Pulsar tenant")
+ private String pulsarTenant;
+
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
+
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
+
+ @ApiModelProperty("Pulsar partition number")
+ private Integer partitionNum;
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
new file mode 100644
index 0000000000..819df3df34
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Pulsar data node operator
+ */
+@Service
+public class PulsarDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.PULSAR;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ PulsarDataNodeInfo pulsarDataNodeInfo = new PulsarDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, pulsarDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ PulsarDataNodeDTO dto =
PulsarDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, pulsarDataNodeInfo);
+ }
+ return pulsarDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ PulsarDataNodeRequest nodeRequest = (PulsarDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(nodeRequest, targetEntity, true);
+ try {
+ PulsarDataNodeDTO dto =
PulsarDataNodeDTO.getFromRequest(nodeRequest, targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for pulsar node:
%s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ PulsarDataNodeRequest pulsarDataNodeRequest = (PulsarDataNodeRequest)
request;
+ String adminUrl = pulsarDataNodeRequest.getAdminUrl();
+ String token = pulsarDataNodeRequest.getToken();
+ Preconditions.expectNotBlank(adminUrl,
ErrorCodeEnum.INVALID_PARAMETER, "connection admin urlcannot be empty");
+ if (getPulsarConnection(adminUrl, token)) {
+ LOGGER.info("pulsar connection success for adminUrl={}, token={}",
+ adminUrl, token);
+ }
+ return true;
+ }
+
+ private boolean getPulsarConnection(String adminUrl, String token) {
+
+ PulsarClusterInfo pulsarClusterInfo =
PulsarClusterInfo.builder().adminUrl(adminUrl)
+ .token(token).build();
+ try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+ // test connect for pulsar adminUrl
+ pulsarAdmin.tenants().getTenants();
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("Pulsar connection failed for
AdminUrl=%s", pulsarClusterInfo.getAdminUrl());
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index dacebf7314..81c3de2b6a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -70,7 +70,7 @@ public class PulsarUtils {
*
* @apiNote It must be closed after use.
*/
- private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String
token) throws PulsarClientException {
+ public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String
token) throws PulsarClientException {
return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
.authentication(AuthenticationFactory.token(token)).build();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
new file mode 100644
index 0000000000..8f5935f8e4
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Pulsar resource operate for creating pulsar resource
+ */
+@Service
+public class PulsarResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarResourceOperator.class);
+
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
+ @Autowired
+ private PulsarOperator pulsarOperator;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.PULSAR.equals(sinkType);
+ }
+
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
+ if
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOG.warn("sink resource [" + sinkInfo.getId() + "] already
success, skip to create");
+ return;
+ } else if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
{
+ LOG.warn("create resource was disabled, skip to create for [" +
sinkInfo.getId() + "]");
+ return;
+ }
+ this.createTopic(sinkInfo);
+ }
+
+ private void createTopic(SinkInfo sinkInfo) {
+ PulsarSinkDTO pulsarSinkDTO =
PulsarSinkDTO.getFromJson(sinkInfo.getExtParams());
+ PulsarDataNodeDTO pulsarDataNodeInfo = getPulsarDataNodeInfo(sinkInfo);
+ try {
+
+ PulsarClusterInfo pulsarClusterInfo =
PulsarClusterInfo.builder().adminUrl(pulsarDataNodeInfo.getAdminUrl())
+ .token(pulsarDataNodeInfo.getToken()).build();
+ PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
+ // create pulsar tenant
+ pulsarOperator.createTenant(pulsarAdmin,
pulsarSinkDTO.getPulsarTenant());
+ // use default config to create namespace
+ InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
+ pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo,
pulsarSinkDTO.getPulsarTenant(),
+ pulsarSinkDTO.getNamespace());
+ String queueModel = pulsarSinkDTO.getPartitionNum() > 0 ?
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL
+ : InlongConstants.PULSAR_QUEUE_TYPE_SERIAL;
+ PulsarTopicInfo topicInfo =
PulsarTopicInfo.builder().pulsarTenant(pulsarSinkDTO.getPulsarTenant())
+ .namespace(pulsarSinkDTO.getNamespace())
+ .topicName(pulsarSinkDTO.getTopic())
+ .numPartitions(pulsarSinkDTO.getPartitionNum())
+ .queueModule(queueModel)
+ .build();
+ // create topic
+ pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+ } catch (PulsarClientException | PulsarAdminException e) {
+ LOG.error("init pulsar admin error", e);
+ throw new BusinessException();
+ }
+
+ }
+
+ private PulsarDataNodeDTO getPulsarDataNodeInfo(SinkInfo sinkInfo) {
+
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.expectNotBlank(dataNodeName,
ErrorCodeEnum.INVALID_PARAMETER,
+ "Pulsar admin url not specified and data node is empty");
+ PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ dataNodeName, sinkInfo.getSinkType());
+ PulsarDataNodeDTO pulsarDataNodeDTO = new PulsarDataNodeDTO();
+ CommonBeanUtils.copyProperties(dataNodeInfo, pulsarDataNodeDTO, true);
+ return pulsarDataNodeDTO;
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 62af32425c..789065951f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -23,8 +23,10 @@ import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -54,6 +56,8 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
protected static final String KEY_GROUP_ID = "inlongGroupId";
protected static final String KEY_STREAM_ID = "inlongStreamId";
+ protected static final String KEY_DATA_TYPE = "dataType";
+ protected static final String KEY_SEPARATOR = "separator";
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSinkOperator.class);
@Autowired
protected StreamSinkEntityMapper sinkMapper;
@@ -61,6 +65,8 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
protected StreamSinkFieldEntityMapper sinkFieldMapper;
@Autowired
protected DataNodeOperateHelper dataNodeHelper;
+ @Autowired
+ protected InlongStreamEntityMapper inlongStreamEntityMapper;
/**
* Setting the parameters of the latest entity.
@@ -222,10 +228,14 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
@Override
public Map<String, String> parse2IdParams(StreamSinkEntity streamSink,
List<String> fields) {
Map<String, String> param;
+ InlongStreamEntity inlongStreamEntity =
inlongStreamEntityMapper.selectByIdentifier(
+ streamSink.getInlongGroupId(), streamSink.getInlongStreamId());
try {
param = JsonUtils.parseObject(streamSink.getExtParams(),
HashMap.class);
// put group and stream info
assert param != null;
+ param.put(KEY_SEPARATOR, inlongStreamEntity.getDataSeparator());
+ param.put(KEY_DATA_TYPE, inlongStreamEntity.getDataType());
param.put(KEY_GROUP_ID, streamSink.getInlongGroupId());
param.put(KEY_STREAM_ID, streamSink.getInlongStreamId());
return param;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
new file mode 100644
index 0000000000..9452d6a73e
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.pulsar;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
+import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pulsar sink operator
+ */
+@Service
+public class PulsarSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarSinkOperator.class);
+
+ private static final String TOPIC = "topic";
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private DataNodeEntityMapper dataNodeEntityMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.PULSAR.equals(sinkType);
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.PULSAR;
+ }
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
+ PulsarSinkRequest sinkRequest = (PulsarSinkRequest) request;
+ try {
+ PulsarSinkDTO dto = PulsarSinkDTO.getFromRequest(sinkRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Pulsar SinkDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ PulsarSink sink = new PulsarSink();
+ if (entity == null) {
+ return sink;
+ }
+ DataNodeEntity dataNodeEntity =
dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(),
+ DataNodeType.PULSAR);
+ PulsarDataNodeDTO pulsarDataNodeDTO =
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+ PulsarDataNodeDTO.class);
+ PulsarSinkDTO dto = PulsarSinkDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ CommonBeanUtils.copyProperties(pulsarDataNodeDTO, sink, true);
+ List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+ @Override
+ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink,
List<String> fields) {
+
+ Map<String, String> params = super.parse2IdParams(streamSink, fields);
+ PulsarSinkDTO pulsarSinkDTO;
+ try {
+ pulsarSinkDTO = objectMapper.readValue(streamSink.getExtParams(),
PulsarSinkDTO.class);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("parse pulsar sink dto error", e);
+ return params;
+ }
+ String fullTopicName = getFullTopicName(pulsarSinkDTO);
+ params.put(TOPIC, fullTopicName);
+ return params;
+ }
+
+ private String getFullTopicName(PulsarSinkDTO pulsarSinkDTO) {
+ return pulsarSinkDTO.getPulsarTenant() + "/" +
pulsarSinkDTO.getNamespace() + "/" + pulsarSinkDTO.getTopic();
+ }
+
+}