This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3faf124148 [INLONG-10884][Manager] Support configuring HTTP type sink
(#10890)
3faf124148 is described below
commit 3faf124148a74127e900954a2317faac07b9b66a
Author: fuweng11 <[email protected]>
AuthorDate: Mon Aug 26 19:33:59 2024 +0800
[INLONG-10884][Manager] Support configuring HTTP type sink (#10890)
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SinkType.java | 3 +
.../inlong/manager/common/enums/ClusterType.java | 2 +
.../sort/http/SortHttpClusterInfo.java} | 33 +++---
.../sort/http/SortHttpClusterRequest.java} | 31 +++--
.../manager/pojo/node/http/HttpDataNodeDTO.java | 75 ++++++++++++
.../manager/pojo/node/http/HttpDataNodeInfo.java | 63 ++++++++++
.../http/HttpDataNodeRequest.java} | 37 ++++--
.../inlong/manager/pojo/sink/BaseStreamSink.java | 3 +
.../inlong/manager/pojo/sink/SinkRequest.java | 3 +
.../inlong/manager/pojo/sink/StreamSink.java | 3 +
.../inlong/manager/pojo/sink/http/HttpSink.java | 64 ++++++++++
.../inlong/manager/pojo/sink/http/HttpSinkDTO.java | 76 ++++++++++++
.../HttpSinkRequest.java} | 38 ++++--
.../service/cluster/SortClusterOperator.java | 5 +
.../service/node/http/HttpDataNodeOperator.java | 96 +++++++++++++++
.../service/sink/http/HttpSinkOperator.java | 129 +++++++++++++++++++++
17 files changed, 605 insertions(+), 57 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 0f1952c938..47b139f159 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -37,6 +37,7 @@ public class DataNodeType {
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String DORIS = "DORIS";
+ public static final String HTTP = "HTTP";
public static final String OCEANBASE = "OCEANBASE";
/**
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 5d069e33df..16a1bfd3d8 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -71,6 +71,9 @@ public class SinkType extends StreamType {
@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String TUBEMQ = "TUBEMQ";
+ @SupportSortType(sortType = SortType.SORT_STANDALONE)
+ public static final String HTTP = "HTTP";
+
@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String OCEANBASE = "OCEANBASE";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index d7e8ba7d4b..d16ab1d46c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -33,6 +33,7 @@ public class ClusterType {
public static final String DATAPROXY = "DATAPROXY";
public static final String KAFKA = "KAFKA";
+ public static final String SORT_HTTP = "SORT_HTTP";
public static final String SORT_ES = "SORT_ES";
public static final String SORT_CLS = "SORT_CLS";
public static final String SORT_PULSAR = "SORT_PULSAR";
@@ -48,6 +49,7 @@ public class ClusterType {
add(ClusterType.PULSAR);
add(ClusterType.DATAPROXY);
add(ClusterType.KAFKA);
+ add(ClusterType.SORT_HTTP);
add(ClusterType.SORT_ES);
add(ClusterType.SORT_CLS);
add(ClusterType.SORT_PULSAR);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java
similarity index 58%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java
index 09115a9e02..b2660c2df9 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java
@@ -15,26 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.cluster.sort.http;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo;
import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
-/**
- * The base parameter class of StreamSink, support user extend their own
business params.
- */
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
-
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORT_HTTP)
+@ApiModel("Inlong cluster info for Sort http")
+public class SortHttpClusterInfo extends BaseSortClusterInfo {
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
-}
+ public SortHttpClusterInfo() {
+ this.setType(ClusterType.SORT_HTTP);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java
similarity index 58%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java
index 09115a9e02..a7fd4d027e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java
@@ -15,26 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.cluster.sort.http;
+
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest;
import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
-/**
- * The base parameter class of StreamSink, support user extend their own
business params.
- */
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
-
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.SORT_HTTP)
+@ApiModel("Inlong cluster request for Sort http")
+public class SortHttpClusterRequest extends BaseSortClusterRequest {
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
+ public SortHttpClusterRequest() {
+ this.setType(ClusterType.SORT_HTTP);
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java
new file mode 100644
index 0000000000..52fa047606
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.http;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Http service data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Http service data node info")
+public class HttpDataNodeDTO {
+
+ @ApiModelProperty("HTTP base url")
+ private String baseUrl;
+
+ @ApiModelProperty("Whether to enable credential")
+ private Boolean enableCredential;
+
+ @ApiModelProperty("Max connect count")
+ private Integer maxConnect;
+ /**
+ * Get the dto instance from the request
+ */
+ public static HttpDataNodeDTO getFromRequest(HttpDataNodeRequest request,
String extParams) {
+ HttpDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? HttpDataNodeDTO.getFromJson(extParams)
+ : new HttpDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static HttpDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, HttpDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+ String.format("Failed to parse extParams for Cloud log
service node: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java
new file mode 100644
index 0000000000..b24224134c
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.http;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Cloud log service data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HTTP)
+@ApiModel("HTTP data node info")
+public class HttpDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("HTTP base url")
+ private String baseUrl;
+
+ @ApiModelProperty("Whether to enable credential")
+ private Boolean enableCredential;
+
+ @ApiModelProperty("Max connect count")
+ private Integer maxConnect;
+
+ public HttpDataNodeInfo() {
+ setType(DataNodeType.HTTP);
+ }
+
+ @Override
+ public DataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, HttpDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java
similarity index 51%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java
index 09115a9e02..1fba8d9518 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java
@@ -15,26 +15,39 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.node.http;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
/**
- * The base parameter class of StreamSink, support user extend their own
business params.
+ * Cloud log service data node request
*/
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HTTP)
+@ApiModel("Http service data node request")
+public class HttpDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("HTTP base url")
+ private String baseUrl;
+
+ @ApiModelProperty("Whether to enable credential")
+ private Boolean enableCredential;
+
+ @ApiModelProperty("Max connect count")
+ private Integer maxConnect;
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+ public HttpDataNodeRequest() {
+ this.setType(DataNodeType.HTTP);
+ }
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
index 09115a9e02..d8df7f4a28 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
@@ -32,6 +32,9 @@ import lombok.NoArgsConstructor;
@ApiModel("Base info of stream sink")
public class BaseStreamSink {
+ @ApiModelProperty("Transform sql")
+ private String transformSql;
+
@ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
private String startConsumeTime;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index 8c190a3a84..24c544b943 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -71,6 +71,9 @@ public abstract class SinkRequest {
@Pattern(regexp = "^[a-zA-Z0-9_.-]{1,100}$", message = "sinkName only
supports letters, numbers, '.', '-', or '_'")
private String sinkName;
+ @ApiModelProperty("Transform sql")
+ private String transformSql;
+
@ApiModelProperty("Sink description")
@Length(max = 500, message = "length must be less than or equal to 500")
private String description;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
index 7641256de0..85fd72a1a4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
@@ -68,6 +68,9 @@ public abstract class StreamSink extends StreamNode {
@ApiModelProperty("Sink name, unique in one stream.")
private String sinkName;
+ @ApiModelProperty("Transform sql")
+ private String transformSql;
+
@ApiModelProperty("Sink description")
private String description;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java
new file mode 100644
index 0000000000..fdcd7228fe
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.http;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Map;
+
+/**
+ * HTTP sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "HTTP sink info")
+@JsonTypeDefine(value = SinkType.HTTP)
+public class HttpSink extends StreamSink {
+
+ @ApiModelProperty("HTTP path")
+ private String path;
+
+ @ApiModelProperty("HTTP method, like POST, GET")
+ private String method;
+
+ @ApiModelProperty("HTTP headers")
+ private Map<String, String> headers;
+
+ @ApiModelProperty("Max retry times")
+ private Integer maxRetryTimes;
+
+ public HttpSink() {
+ this.setSinkType(SinkType.HTTP);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, HttpSinkRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java
new file mode 100644
index 0000000000..5431e07ec7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.http;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Map;
+
+/**
+ * Sink info of Cloud log service
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HttpSinkDTO extends BaseStreamSink {
+
+ @ApiModelProperty("HTTP path")
+ private String path;
+
+ @ApiModelProperty("HTTP method, like POST, GET")
+ private String method;
+
+ @ApiModelProperty("HTTP headers")
+ private Map<String, String> headers;
+
+ @ApiModelProperty("Max retry times")
+ private Integer maxRetryTimes;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static HttpSinkDTO getFromRequest(HttpSinkRequest request, String
extParams) {
+ HttpSinkDTO dto = StringUtils.isNotBlank(extParams)
+ ? HttpSinkDTO.getFromJson(extParams)
+ : new HttpSinkDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ public static HttpSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, HttpSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of http SinkDTO failure:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java
similarity index 53%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java
index 09115a9e02..207322aa3a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java
@@ -15,26 +15,40 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.pojo.sink.http;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.Map;
/**
- * The base parameter class of StreamSink, support user extend their own
business params.
+ * Http sink request.
*/
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Http sink request")
+@JsonTypeDefine(value = SinkType.HTTP)
+public class HttpSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("HTTP path")
+ private String path;
+
+ @ApiModelProperty("HTTP method, like POST, GET")
+ private String method;
+
+ @ApiModelProperty("HTTP headers")
+ private Map<String, String> headers;
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+ @ApiModelProperty("Max retry times")
+ private Integer maxRetryTimes;
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
index 6ffdbafae0..b328a39b37 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo;
import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.sort.http.SortHttpClusterInfo;
import org.apache.inlong.manager.pojo.cluster.sort.kafka.SortKafkaClusterInfo;
import
org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo;
import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO;
@@ -50,6 +51,7 @@ public class SortClusterOperator extends
AbstractClusterOperator {
private static final Set<String> SORT_CLUSTER_SET = new HashSet<String>() {
{
+ add(ClusterType.SORT_HTTP);
add(ClusterType.SORT_CLS);
add(ClusterType.SORT_PULSAR);
add(ClusterType.SORT_ES);
@@ -84,6 +86,9 @@ public class SortClusterOperator extends
AbstractClusterOperator {
ClusterInfo sortClusterInfo;
switch (entity.getType()) {
+ case ClusterType.SORT_HTTP:
+ sortClusterInfo = new SortHttpClusterInfo();
+ break;
case ClusterType.SORT_CLS:
sortClusterInfo = new SortClsClusterInfo();
break;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java
new file mode 100644
index 0000000000..18482d4a04
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.http;
+
+import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig;
+import org.apache.inlong.common.pojo.sort.node.NodeConfig;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HttpDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HttpDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ HttpDataNodeRequest httpDataNodeRequest = (HttpDataNodeRequest)
request;
+ CommonBeanUtils.copyProperties(httpDataNodeRequest, targetEntity,
true);
+ try {
+ HttpDataNodeDTO dto =
HttpDataNodeDTO.getFromRequest(httpDataNodeRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for Cloud log
service node: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return DataNodeType.HTTP.equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.HTTP;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+ HttpDataNodeInfo info = new HttpDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, info);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ HttpDataNodeDTO dto =
HttpDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, info);
+ }
+ return info;
+ }
+
+ @Override
+ public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
+ DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
+ HttpNodeConfig httpNodeConfig =
CommonBeanUtils.copyProperties(dataNodeInfo, HttpNodeConfig::new);
+ HttpDataNodeDTO dto =
HttpDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, httpNodeConfig);
+ httpNodeConfig.setPassword(dataNodeEntity.getToken());
+ httpNodeConfig.setNodeName(dataNodeInfo.getName());
+ return httpNodeConfig;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java
new file mode 100644
index 0000000000..030c0c787a
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.http;
+
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.http.HttpSink;
+import org.apache.inlong.manager.pojo.sink.http.HttpSinkDTO;
+import org.apache.inlong.manager.pojo.sink.http.HttpSinkRequest;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Http sink operator
+ */
+@Service
+public class HttpSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HttpSinkOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private DataNodeEntityMapper dataNodeEntityMapper;
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
+ HttpSinkRequest sinkRequest = (HttpSinkRequest) request;
+ try {
+ HttpSinkDTO dto = HttpSinkDTO.getFromRequest(sinkRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of http SinkDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.HTTP;
+ }
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.HTTP.equals(sinkType);
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ HttpSink sink = new HttpSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ HttpSinkDTO dto = HttpSinkDTO.getFromJson(entity.getExtParams());
+ DataNodeEntity dataNodeEntity =
dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(),
+ DataNodeType.HTTP);
+ HttpDataNodeDTO httpDataNodeDTO =
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+ HttpDataNodeDTO.class);
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ CommonBeanUtils.copyProperties(httpDataNodeDTO, sink, true);
+ List<SinkField> sinkFields = getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+ @Override
+ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
+ HttpSink httpSink = (HttpSink) sink;
+ HttpSinkConfig sinkConfig = CommonBeanUtils.copyProperties(httpSink,
HttpSinkConfig::new);
+ List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
+ v -> {
+ FieldConfig fieldConfig = new FieldConfig();
+ FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+ v.getFieldType().toLowerCase());
+ fieldConfig.setName(v.getFieldName());
+ fieldConfig.setFormatInfo(formatInfo);
+ return fieldConfig;
+ }).collect(Collectors.toList());
+ sinkConfig.setFieldConfigs(fields);
+ return sinkConfig;
+ }
+}