This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bebbb4826 [INLONG-9770][Manager] Unified compression type 
configuration (#9771)
2bebbb4826 is described below

commit 2bebbb4826cceba1761b7d3f5a4d4d3522d1e921
Author: fuweng11 <[email protected]>
AuthorDate: Tue Mar 5 16:00:22 2024 +0800

    [INLONG-9770][Manager] Unified compression type configuration (#9771)
---
 .../inlong/common/enums/InlongCompressType.java    | 72 ++++++++++++++++++++++
 .../main/resources/mappers/ClusterSetMapper.xml    |  1 +
 .../manager/pojo/dataproxy/InlongStreamId.java     |  9 +++
 .../manager/pojo/stream/InlongStreamBriefInfo.java |  3 +
 .../manager/pojo/stream/InlongStreamExtParam.java  |  3 +
 .../manager/pojo/stream/InlongStreamInfo.java      |  3 +
 .../manager/pojo/stream/InlongStreamRequest.java   |  3 +
 .../repository/DataProxyConfigRepository.java      |  2 +
 8 files changed, 96 insertions(+)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
new file mode 100644
index 0000000000..24aa16898e
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/InlongCompressType.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+import java.util.Objects;
+
+public enum InlongCompressType {
+
+    NONE(0, "NONE", "The message compressed with nothing"),
+    INLONG_GZ(1, "INLONG_GZ", "The message compressed with inlong gz"),
+    INLONG_SNAPPY(2, "INLONG_SNAPPY", "The message compressed with inlong 
snappy"),
+    UNKNOWN(99, "UNKNOWN", "Unknown compress type");
+
+    private final int id;
+    private final String name;
+    private final String desc;
+
+    InlongCompressType(int id, String name, String desc) {
+        this.id = id;
+        this.name = name;
+        this.desc = desc;
+    }
+
+    public static InlongCompressType valueOf(int value) {
+        for (InlongCompressType msgCompressType : InlongCompressType.values()) 
{
+            if (msgCompressType.getId() == value) {
+                return msgCompressType;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    public static InlongCompressType forType(String type) {
+        for (InlongCompressType msgCompressType : InlongCompressType.values()) 
{
+            if (Objects.equals(msgCompressType.getName(), type)) {
+                return msgCompressType;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public String getStrId() {
+        return String.valueOf(id);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+}
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 81972a1c43..04c787d3dd 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -49,6 +49,7 @@
         select inlong_group_id,
                inlong_stream_id,
                data_type,
+               wrap_type,
                mq_resource as topic,
                ext_params
         from inlong_stream
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
index 2d329c3426..9e2d52b6fe 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/dataproxy/InlongStreamId.java
@@ -26,16 +26,25 @@ public class InlongStreamId {
     private String inlongStreamId;
     private String topic;
     private String dataType;
+    private String wrapType;
     private String extParams;
 
     public String getDataType() {
         return dataType;
     }
 
+    public String getWrapType() {
+        return wrapType;
+    }
+
     public void setDataType(String dataType) {
         this.dataType = dataType;
     }
 
+    public void setWrapType(String wrapType) {
+        this.wrapType = wrapType;
+    }
+
     /**
      * get inlongGroupId
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index 646bbfb59f..61cea87a8a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -85,6 +85,9 @@ public class InlongStreamBriefInfo {
     @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType;
+
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 9b357117c8..bb67615167 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -48,6 +48,9 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType = "NONE";
+
     @ApiModelProperty(value = "Predefined fields")
     private String predefinedFields;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index 257fa5a0ea..2920344bef 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -133,6 +133,9 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "The message body wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType;
+
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index fc2b012925..2269410b5e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -121,6 +121,9 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private boolean ignoreParseError = true;
 
+    @ApiModelProperty(value = "The compression type used for dataproxy and 
sort side data transmission to reduce the network IO overhead")
+    private String inlongCompressType;
+
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index a0a38facfe..44245cb0c4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -92,6 +92,7 @@ public class DataProxyConfigRepository implements IRepository 
{
     public static final String KEY_NEW_TENANT_KEY = "pulsarTenant";
     public static final String KEY_OLD_TENANT_KEY = "tenant";
     public static final String KEY_DATA_TYPE = "dataType";
+    public static final String KEY_WRAP_TYPE = "wrapType";
     public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
     public static final String KEY_BACKUP_TOPIC = "backup_topic";
     public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
@@ -376,6 +377,7 @@ public class DataProxyConfigRepository implements 
IRepository {
         streamIdMap.forEach((k, v) -> {
             Map<String, String> params = fromJsonToMap(v.getExtParams());
             params.computeIfAbsent(KEY_DATA_TYPE, type -> v.getDataType());
+            params.computeIfAbsent(KEY_WRAP_TYPE, type -> v.getWrapType());
             streamParams.put(k, params);
         });
         // reload inlong stream ext

Reply via email to