This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2bebbb4826 [INLONG-9770][Manager] Unified compression type
configuration (#9771)
2bebbb4826 is described below
commit 2bebbb4826cceba1761b7d3f5a4d4d3522d1e921
Author: fuweng11 <[email protected]>
AuthorDate: Tue Mar 5 16:00:22 2024 +0800
[INLONG-9770][Manager] Unified compression type configuration (#9771)
---
.../inlong/common/enums/InlongCompressType.java | 72 ++++++++++++++++++++++
.../main/resources/mappers/ClusterSetMapper.xml | 1 +
.../manager/pojo/dataproxy/InlongStreamId.java | 9 +++
.../manager/pojo/stream/InlongStreamBriefInfo.java | 3 +
.../manager/pojo/stream/InlongStreamExtParam.java | 3 +
.../manager/pojo/stream/InlongStreamInfo.java | 3 +
.../manager/pojo/stream/InlongStreamRequest.java | 3 +
.../repository/DataProxyConfigRepository.java | 2 +
8 files changed, 96 insertions(+)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
new file mode 100644
index 0000000000..24aa16898e
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+import java.util.Objects;
+
+public enum InlongCompressType {
+
+ NONE(0, "NONE", "The message compressed with nothing"),
+ INLONG_GZ(1, "INLONG_GZ", "The message compressed with inlong gz"),
+ INLONG_SNAPPY(2, "INLONG_SNAPPY", "The message compressed with inlong
snappy"),
+ UNKNOWN(99, "UNKNOWN", "Unknown compress type");
+
+ private final int id;
+ private final String name;
+ private final String desc;
+
+ InlongCompressType(int id, String name, String desc) {
+ this.id = id;
+ this.name = name;
+ this.desc = desc;
+ }
+
+ public static InlongCompressType valueOf(int value) {
+ for (InlongCompressType msgCompressType : InlongCompressType.values())
{
+ if (msgCompressType.getId() == value) {
+ return msgCompressType;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public static InlongCompressType forType(String type) {
+ for (InlongCompressType msgCompressType : InlongCompressType.values())
{
+ if (Objects.equals(msgCompressType.getName(), type)) {
+ return msgCompressType;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getStrId() {
+ return String.valueOf(id);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 81972a1c43..04c787d3dd 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -49,6 +49,7 @@
select inlong_group_id,
inlong_stream_id,
data_type,
+ wrap_type,
mq_resource as topic,
ext_params
from inlong_stream
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
index 2d329c3426..9e2d52b6fe 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
@@ -26,16 +26,25 @@ public class InlongStreamId {
private String inlongStreamId;
private String topic;
private String dataType;
+ private String wrapType;
private String extParams;
public String getDataType() {
return dataType;
}
+ public String getWrapType() {
+ return wrapType;
+ }
+
public void setDataType(String dataType) {
this.dataType = dataType;
}
+ public void setWrapType(String wrapType) {
+ this.wrapType = wrapType;
+ }
+
/**
* get inlongGroupId
*
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index 646bbfb59f..61cea87a8a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -85,6 +85,9 @@ public class InlongStreamBriefInfo {
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType;
+
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 9b357117c8..bb67615167 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -48,6 +48,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType = "NONE";
+
@ApiModelProperty(value = "Predefined fields")
private String predefinedFields;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 257fa5a0ea..2920344bef 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -133,6 +133,9 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType;
+
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index fc2b012925..2269410b5e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,6 +121,9 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private boolean ignoreParseError = true;
+ @ApiModelProperty(value = "The compression type used for dataproxy and
sort side data transmission to reduce the network IO overhead")
+ private String inlongCompressType;
+
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index a0a38facfe..44245cb0c4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -92,6 +92,7 @@ public class DataProxyConfigRepository implements IRepository
{
public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
public static final String KEY_OLD_TENANT_KEY = "tenant";
public static final String KEY_DATA_TYPE = "dataType";
+ public static final String KEY_WRAP_TYPE = "wrapType";
public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
public static final String KEY_BACKUP_TOPIC = "backup_topic";
public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -376,6 +377,7 @@ public class DataProxyConfigRepository implements
IRepository {
streamIdMap.forEach((k, v) -> {
Map<String, String> params = fromJsonToMap(v.getExtParams());
params.computeIfAbsent(KEY_DATA_TYPE, type -> v.getDataType());
+ params.computeIfAbsent(KEY_WRAP_TYPE, type -> v.getWrapType());
streamParams.put(k, params);
});
// reload inlong stream ext