This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit f21002f767f0ce52e81e57984ca6c84f8b6fc884
Author: Lightman <[email protected]>
AuthorDate: Fri May 27 21:52:05 2022 +0800

    [feature] Support compression prop (#8923)
---
 be/src/olap/rowset/segment_v2/column_writer.cpp    |  4 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 10 +++--
 be/src/olap/rowset/segment_v2/segment_writer.h     |  3 +-
 be/src/olap/tablet_meta.cpp                        | 43 ++++++++++++++++++----
 be/src/olap/tablet_meta.h                          |  3 +-
 be/src/olap/tablet_schema.cpp                      |  2 +
 be/src/olap/tablet_schema.h                        |  3 ++
 .../java/org/apache/doris/alter/RollupJobV2.java   |  3 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  3 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |  3 +-
 .../java/org/apache/doris/catalog/Catalog.java     | 29 +++++++++++++--
 .../java/org/apache/doris/catalog/OlapTable.java   | 16 ++++++++
 .../org/apache/doris/catalog/TableProperty.java    | 16 +++++++-
 .../apache/doris/common/util/PropertyAnalyzer.java | 32 ++++++++++++++++
 .../org/apache/doris/master/ReportHandler.java     |  3 +-
 .../org/apache/doris/task/CreateReplicaTask.java   | 10 ++++-
 .../org/apache/doris/catalog/CreateTableTest.java  | 10 +++++
 .../java/org/apache/doris/task/AgentTaskTest.java  |  3 +-
 gensrc/proto/olap_file.proto                       |  2 +
 gensrc/thrift/AgentService.thrift                  | 31 ++++++++++++++++
 20 files changed, 202 insertions(+), 27 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index bcf2b2a338..65696a816f 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -120,7 +120,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
             length_options.meta->set_is_nullable(false);
             
length_options.meta->set_length(get_scalar_type_info<OLAP_FIELD_TYPE_UNSIGNED_INT>()->size());
             length_options.meta->set_encoding(DEFAULT_ENCODING);
-            length_options.meta->set_compression(LZ4F);
+            length_options.meta->set_compression(opts.meta->compression());
 
             length_options.need_zone_map = false;
             length_options.need_bloom_filter = false;
@@ -147,7 +147,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
                 null_options.meta->set_is_nullable(false);
                 
null_options.meta->set_length(get_scalar_type_info<OLAP_FIELD_TYPE_TINYINT>()->size());
                 null_options.meta->set_encoding(DEFAULT_ENCODING);
-                null_options.meta->set_compression(LZ4F);
+                null_options.meta->set_compression(opts.meta->compression());
 
                 null_options.need_zone_map = false;
                 null_options.need_bloom_filter = false;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 669c770655..896c5e793c 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -51,17 +51,19 @@ SegmentWriter::~SegmentWriter() {
 };
 
 void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
-                                      const TabletColumn& column) {
+                                     const TabletColumn& column,
+                                     const TabletSchema* tablet_schema) {
     // TODO(zc): Do we need this column_id??
     meta->set_column_id((*column_id)++);
     meta->set_unique_id(column.unique_id());
     meta->set_type(column.type());
     meta->set_length(column.length());
     meta->set_encoding(DEFAULT_ENCODING);
-    meta->set_compression(LZ4F);
+    meta->set_compression(tablet_schema->compression_type());
     meta->set_is_nullable(column.is_nullable());
     for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
-        init_column_meta(meta->add_children_columns(), column_id, 
column.get_sub_column(i));
+        init_column_meta(meta->add_children_columns(), column_id, 
column.get_sub_column(i),
+                         tablet_schema);
     }
 }
 
@@ -72,7 +74,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec 
__attribute__((unused))
         ColumnWriterOptions opts;
         opts.meta = _footer.add_columns();
 
-        init_column_meta(opts.meta, &column_id, column);
+        init_column_meta(opts.meta, &column_id, column, _tablet_schema);
 
         // now we create zone map for key columns in AGG_KEYS or all column in 
UNIQUE_KEYS or DUP_KEYS
         // and not support zone map for array type.
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index ebba30fe70..ad3b608dd6 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -68,7 +68,8 @@ public:
 
     Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
 
-    static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, 
const TabletColumn& column);
+    static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
+                                 const TabletColumn& column, const 
TabletSchema* tablet_schema);
 
 private:
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 5ad3aa71a7..c4696d10c8 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -43,7 +43,7 @@ OLAPStatus TabletMeta::create(const TCreateTabletReq& 
request, const TabletUid&
             request.tablet_schema.schema_hash, shard_id, 
request.tablet_schema, next_unique_id,
             col_ordinal_to_unique_id, tablet_uid,
             request.__isset.tablet_type ? request.tablet_type : 
TTabletType::TABLET_TYPE_DISK,
-            request.storage_medium));
+            request.storage_medium, request.compression_type));
     return OLAP_SUCCESS;
 }
 
@@ -54,7 +54,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
                        uint32_t next_unique_id,
                        const std::unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id,
                        TabletUid tablet_uid, TTabletType::type tabletType,
-                       TStorageMedium::type t_storage_medium)
+                       TStorageMedium::type t_storage_medium,
+                       TCompressionType::type compression_type)
         : _tablet_uid(0, 0), _schema(new TabletSchema) {
     TabletMetaPB tablet_meta_pb;
     tablet_meta_pb.set_table_id(table_id);
@@ -88,14 +89,40 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
         LOG(WARNING) << "unknown tablet keys type";
         break;
     }
+    // compress_kind used to compress segment files
     schema->set_compress_kind(COMPRESS_LZ4);
 
-    switch(tablet_schema.sort_type) {
-        case TSortType::type::ZORDER:
-            schema->set_sort_type(SortType::ZORDER);
-            break;
-        default:
-            schema->set_sort_type(SortType::LEXICAL);
+    // compression_type used to compress segment page
+    switch (compression_type) {
+    case TCompressionType::NO_COMPRESSION:
+        schema->set_compression_type(NO_COMPRESSION);
+        break;
+    case TCompressionType::SNAPPY:
+        schema->set_compression_type(SNAPPY);
+        break;
+    case TCompressionType::LZ4:
+        schema->set_compression_type(LZ4);
+        break;
+    case TCompressionType::LZ4F:
+        schema->set_compression_type(LZ4F);
+        break;
+    case TCompressionType::ZLIB:
+        schema->set_compression_type(ZLIB);
+        break;
+    case TCompressionType::ZSTD:
+        schema->set_compression_type(ZSTD);
+        break;
+    default:
+        schema->set_compression_type(LZ4F);
+        break;
+    }
+
+    switch (tablet_schema.sort_type) {
+    case TSortType::type::ZORDER:
+        schema->set_sort_type(SortType::ZORDER);
+        break;
+    default:
+        schema->set_sort_type(SortType::LEXICAL);
     }
     schema->set_sort_col_num(tablet_schema.sort_col_num);
     tablet_meta_pb.set_in_restore_mode(false);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index b43e7d0727..5654fee29e 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -83,7 +83,8 @@ public:
                uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t 
next_unique_id,
                const std::unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id,
                TabletUid tablet_uid, TTabletType::type tabletType,
-               TStorageMedium::type t_storage_medium);
+               TStorageMedium::type t_storage_medium,
+               TCompressionType::type compression_type);
     // If need add a filed in TableMeta, filed init copy in copy construct 
function
     TabletMeta(const TabletMeta& tablet_meta);
     TabletMeta(TabletMeta&& tablet_meta) = delete;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 782ae1a7ac..8f67d7f512 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -426,6 +426,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& 
schema) {
     _sequence_col_idx = schema.sequence_col_idx();
     _sort_type = schema.sort_type();
     _sort_col_num = schema.sort_col_num();
+    _compression_type = schema.compression_type();
 }
 
 void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
@@ -446,6 +447,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* 
tablet_meta_pb) {
     tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx);
     tablet_meta_pb->set_sort_type(_sort_type);
     tablet_meta_pb->set_sort_col_num(_sort_col_num);
+    tablet_meta_pb->set_compression_type(_compression_type);
 }
 
 uint32_t TabletSchema::mem_size() const {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index f9bd519cc1..4fbdf3b2e0 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "gen_cpp/olap_file.pb.h"
+#include "gen_cpp/segment_v2.pb.h"
 #include "olap/olap_define.h"
 #include "olap/types.h"
 
@@ -143,6 +144,7 @@ public:
     inline void set_delete_sign_idx(int32_t delete_sign_idx) { 
_delete_sign_idx = delete_sign_idx; }
     inline bool has_sequence_col() const { return _sequence_col_idx != -1; }
     inline int32_t sequence_col_idx() const { return _sequence_col_idx; }
+    inline segment_v2::CompressionTypePB compression_type() const { return 
_compression_type; }
     vectorized::Block create_block(const std::vector<uint32_t>& return_columns,
             const std::unordered_set<uint32_t>* 
tablet_columns_need_convert_null = nullptr) const;
 
@@ -165,6 +167,7 @@ private:
     size_t _num_short_key_columns = 0;
     size_t _num_rows_per_row_block = 0;
     CompressKind _compress_kind = COMPRESS_NONE;
+    segment_v2::CompressionTypePB _compression_type = 
segment_v2::CompressionTypePB::LZ4F;
     size_t _next_column_unique_id = 0;
 
     bool _has_bf_fpp = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index a328fb0400..6c1ea1148d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -232,7 +232,8 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 rollupSchema, tbl.getCopiedBfColumns(), 
tbl.getBfFpp(), countDownLatch,
                                 tbl.getCopiedIndexes(),
                                 tbl.isInMemory(),
-                                tabletType);
+                                tabletType,
+                                tbl.getCompressionType());
                         
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), 
baseSchemaHash);
                         if (this.storageFormat != null) {
                             
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 856df527ea..897ac2cd4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -254,7 +254,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     originKeysType, TStorageType.COLUMN, 
storageMedium,
                                     shadowSchema, bfColumns, bfFpp, 
countDownLatch, indexes,
                                     tbl.isInMemory(),
-                                    
tbl.getPartitionInfo().getTabletType(partitionId));
+                                    
tbl.getPartitionInfo().getTabletType(partitionId),
+                                    tbl.getCompressionType());
                             
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, 
shadowIdxId).get(shadowTabletId), originSchemaHash);
                             if (this.storageFormat != null) {
                                 
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index f29ab14be1..b30e13d52f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -963,7 +963,8 @@ public class RestoreJob extends AbstractJob {
                             indexMeta.getSchema(), bfColumns, bfFpp, null,
                             localTbl.getCopiedIndexes(),
                             localTbl.isInMemory(),
-                            
localTbl.getPartitionInfo().getTabletType(restorePart.getId()));
+                            
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
+                            localTbl.getCompressionType());
                     task.setInRestoreMode(true);
                     batchTask.addTask(task);
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index bf04888bf3..7685327ef1 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -241,6 +241,7 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
@@ -3003,6 +3004,7 @@ public class Catalog {
      *     6.2. replicationNum
      *     6.3. inMemory
      *     6.4. storageFormat
+     *     6.5. compressionType
      * 7. set index meta
      * 8. check colocation properties
      * 9. create tablet in BE
@@ -3281,6 +3283,7 @@ public class Catalog {
                     singlePartitionDesc.isInMemory(),
                     olapTable.getStorageFormat(),
                     singlePartitionDesc.getTabletType(),
+                    olapTable.getCompressionType(),
                     olapTable.getDataSortInfo()
             );
 
@@ -3512,6 +3515,7 @@ public class Catalog {
                                                  boolean isInMemory,
                                                  TStorageFormat storageFormat,
                                                  TTabletType tabletType,
+                                                 TCompressionType 
compressionType,
                                                  DataSortInfo dataSortInfo) 
throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
@@ -3579,7 +3583,8 @@ public class Catalog {
                             indexes,
                             isInMemory,
                             tabletType,
-                            dataSortInfo);
+                            dataSortInfo,
+                            compressionType);
                     task.setStorageFormat(storageFormat);
                     batchTask.addTask(task);
                     // add to AgentTaskQueue for handling finish report.
@@ -3697,6 +3702,15 @@ public class Catalog {
         }
         olapTable.setStorageFormat(storageFormat);
 
+        // get compression type
+        TCompressionType compressionType = TCompressionType.LZ4;
+        try {
+            compressionType = 
PropertyAnalyzer.analyzeCompressionType(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setCompressionType(compressionType);
+
         // check data sort properties
         DataSortInfo dataSortInfo = 
PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
                 keysDesc.keysColumnSize(), storageFormat);
@@ -3741,6 +3755,7 @@ public class Catalog {
             throw new DdlException(e.getMessage());
         }
 
+
         if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
             // if this is an unpartitioned table, we should analyze data 
property and replication num here.
             // if this is a partitioned table, there properties are already 
analyzed in RangePartitionDesc analyze phase.
@@ -3877,7 +3892,7 @@ public class Catalog {
                         partitionInfo.getReplicaAllocation(partitionId),
                         versionInfo, bfColumns, bfFpp,
                         tabletIdSet, olapTable.getCopiedIndexes(),
-                        isInMemory, storageFormat, tabletType, 
olapTable.getDataSortInfo());
+                        isInMemory, storageFormat, tabletType, 
compressionType, olapTable.getDataSortInfo());
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE || 
partitionInfo.getType() == PartitionType.LIST) {
                 try {
@@ -3928,7 +3943,8 @@ public class Catalog {
                             versionInfo, bfColumns, bfFpp,
                             tabletIdSet, olapTable.getCopiedIndexes(),
                             isInMemory, storageFormat,
-                            partitionInfo.getTabletType(entry.getValue()), 
olapTable.getDataSortInfo());
+                            partitionInfo.getTabletType(entry.getValue()),
+                            compressionType, olapTable.getDataSortInfo());
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -4272,6 +4288,12 @@ public class Catalog {
             
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\"
 = \"");
             sb.append(olapTable.getStorageFormat()).append("\"");
 
+            // compression type
+            if (olapTable.getCompressionType() != TCompressionType.LZ4F) {
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = 
\"");
+                sb.append(olapTable.getCompressionType()).append("\"");
+            }
+
             sb.append("\n)");
         } else if (table.getType() == TableType.MYSQL) {
             MysqlTable mysqlTable = (MysqlTable) table;
@@ -6800,6 +6822,7 @@ public class Catalog {
                         copiedTbl.isInMemory(),
                         copiedTbl.getStorageFormat(),
                         
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
+                        copiedTbl.getCompressionType(),
                         copiedTbl.getDataSortInfo());
                 newPartitions.add(newPartition);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 253d53521e..77e22eb118 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -48,6 +48,7 @@ import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TOlapTable;
 import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TStorageFormat;
@@ -1672,6 +1673,14 @@ public class OlapTable extends Table {
         return !tempPartitions.isEmpty();
     }
 
+    public void setCompressionType(TCompressionType compressionType) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPRESSION, 
compressionType.name());
+        tableProperty.buildCompressionType();
+    }
+
     public void setStorageFormat(TStorageFormat storageFormat) {
         if (tableProperty == null) {
             tableProperty = new TableProperty(new HashMap<>());
@@ -1687,6 +1696,13 @@ public class OlapTable extends Table {
         return tableProperty.getStorageFormat();
     }
 
+    public TCompressionType getCompressionType() {
+        if (tableProperty == null) {
+            return TCompressionType.LZ4F;
+        }
+        return tableProperty.getCompressionType();
+    }
+
     public DataSortInfo getDataSortInfo() {
         if (tableProperty == null) {
             return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 844e4032c3..91ef9b0c32 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.persist.OperationType;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Strings;
@@ -68,6 +69,8 @@ public class TableProperty implements Writable {
      */
     private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
+    private TCompressionType compressionType = TCompressionType.LZ4F;
+
     private DataSortInfo dataSortInfo = new DataSortInfo();
 
     public TableProperty(Map<String, String> properties) {
@@ -145,6 +148,12 @@ public class TableProperty implements Writable {
         return this;
     }
 
+    public TableProperty buildCompressionType() {
+        compressionType = 
TCompressionType.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPRESSION,
+                TCompressionType.LZ4F.name()));
+        return this;
+    }
+
     public TableProperty buildStorageFormat() {
         storageFormat = 
TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
                 TStorageFormat.DEFAULT.name()));
@@ -210,6 +219,10 @@ public class TableProperty implements Writable {
         return dataSortInfo;
     }
 
+    public TCompressionType getCompressionType() {
+        return compressionType;
+    }
+
     public void buildReplicaAllocation() {
         try {
             // Must copy the properties because "analyzeReplicaAllocation" 
with remove the property
@@ -233,7 +246,8 @@ public class TableProperty implements Writable {
                 .executeBuildDynamicProperty()
                 .buildInMemory()
                 .buildStorageFormat()
-                .buildDataSortInfo();
+                .buildDataSortInfo()
+                .buildCompressionType();
         if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_105) {
             // get replica num from property map and create replica allocation
             String repNum = 
tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index a4684f09c9..c4e186c4e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -30,6 +30,8 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -73,6 +75,7 @@ public class PropertyAnalyzer {
     public static final String PROPERTIES_COLOCATE_WITH = "colocate_with";
     
     public static final String PROPERTIES_TIMEOUT = "timeout";
+    public static final String PROPERTIES_COMPRESSION = "compression";
 
     public static final String PROPERTIES_DISTRIBUTION_TYPE = 
"distribution_type";
     public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = 
"send_clear_alter_tasks";
@@ -384,6 +387,35 @@ public class PropertyAnalyzer {
         return timeout;
     }
 
+    // analyzeCompressionType will parse the compression type from properties
+    public static TCompressionType analyzeCompressionType(Map<String, String> 
properties) throws  AnalysisException {
+        String compressionType = "";
+        if (properties != null && 
properties.containsKey(PROPERTIES_COMPRESSION)) {
+            compressionType = properties.get(PROPERTIES_COMPRESSION);
+            properties.remove(PROPERTIES_COMPRESSION);
+        } else {
+            return TCompressionType.LZ4F;
+        }
+
+        if (compressionType.equalsIgnoreCase("no_compression")) {
+            return TCompressionType.NO_COMPRESSION;
+        } else if (compressionType.equalsIgnoreCase("lz4")) {
+            return TCompressionType.LZ4;
+        } else if (compressionType.equalsIgnoreCase("lz4f")) {
+            return TCompressionType.LZ4F;
+        } else if (compressionType.equalsIgnoreCase("zlib")) {
+            return TCompressionType.ZLIB;
+        } else if (compressionType.equalsIgnoreCase("zstd")) {
+            return TCompressionType.ZSTD;
+        } else if (compressionType.equalsIgnoreCase("snappy")) {
+            return TCompressionType.SNAPPY;
+        } else if (compressionType.equalsIgnoreCase("default_compression")) {
+            return TCompressionType.LZ4F;
+        } else {
+            throw new AnalysisException("unknown compression type: " + 
compressionType);
+        }
+    }
+
     // analyzeStorageFormat will parse the storage format from properties
     // sql: alter table tablet_name set ("storage_format" = "v2")
     // Use this sql to convert all tablets(base and rollup index) to a new 
format segment
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 05643aa7e2..d875fecbe6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -603,7 +603,8 @@ public class ReportHandler extends Daemon {
                                             TStorageMedium.HDD, 
indexMeta.getSchema(), bfColumns, bfFpp, null,
                                             olapTable.getCopiedIndexes(),
                                             olapTable.isInMemory(),
-                                            
olapTable.getPartitionInfo().getTabletType(partitionId));
+                                            
olapTable.getPartitionInfo().getTabletType(partitionId),
+                                            olapTable.getCompressionType());
                                     createReplicaTask.setIsRecoverTask(true);
                                     
createReplicaBatchTask.addTask(createReplicaTask);
                                 } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 7ecf25979d..5f59d5ff0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Status;
 import org.apache.doris.thrift.TColumn;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TCreateTabletReq;
 import org.apache.doris.thrift.TOlapTableIndex;
 import org.apache.doris.thrift.TStatusCode;
@@ -54,6 +55,7 @@ public class CreateReplicaTask extends AgentTask {
     private KeysType keysType;
     private TStorageType storageType;
     private TStorageMedium storageMedium;
+    private TCompressionType compressionType;
 
     private List<Column> columns;
 
@@ -93,7 +95,7 @@ public class CreateReplicaTask extends AgentTask {
                              Set<String> bfColumns, double bfFpp, 
MarkedCountDownLatch<Long, Long> latch,
                              List<Index> indexes,
                              boolean isInMemory,
-                             TTabletType tabletType) {
+                             TTabletType tabletType, TCompressionType 
compressionType) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
 
         this.shortKeyColumnCount = shortKeyColumnCount;
@@ -104,6 +106,7 @@ public class CreateReplicaTask extends AgentTask {
         this.keysType = keysType;
         this.storageType = storageType;
         this.storageMedium = storageMedium;
+        this.compressionType = compressionType;
 
         this.columns = columns;
 
@@ -125,7 +128,8 @@ public class CreateReplicaTask extends AgentTask {
                              List<Index> indexes,
                              boolean isInMemory,
                              TTabletType tabletType,
-                             DataSortInfo dataSortInfo) {
+                             DataSortInfo dataSortInfo,
+                             TCompressionType compressionType) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
 
         this.shortKeyColumnCount = shortKeyColumnCount;
@@ -136,6 +140,7 @@ public class CreateReplicaTask extends AgentTask {
         this.keysType = keysType;
         this.storageType = storageType;
         this.storageMedium = storageMedium;
+        this.compressionType = compressionType;
 
         this.columns = columns;
 
@@ -267,6 +272,7 @@ public class CreateReplicaTask extends AgentTask {
         }
 
         createTabletReq.setTabletType(tabletType);
+        createTabletReq.setCompressionType(compressionType);
         return createTabletReq;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index e4f97607d9..082a05cf40 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -138,6 +138,16 @@ public class CreateTableTest {
                 .expectThrowsNoException(() -> createTable("create table 
test.tb7(key1 int, key2 varchar(10)) \n"
                         + "distributed by hash(key1) buckets 1 
properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
 
+        ExceptionChecker
+                .expectThrowsNoException(() -> createTable("create table 
test.compression1(key1 int, key2 varchar(10)) \n"
+                        + "distributed by hash(key1) buckets 1 \n"
+                        + "properties('replication_num' = '1', 'compression' = 
'lz4f');"));
+
+        ExceptionChecker
+                .expectThrowsNoException(() -> createTable("create table 
test.compression2(key1 int, key2 varchar(10)) \n"
+                        + "distributed by hash(key1) buckets 1 \n"
+                        + "properties('replication_num' = '1', 'compression' = 
'snappy');"));
+
         ExceptionChecker
                 .expectThrowsNoException(() -> createTable("create table 
test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
                         + "unique key(k1, k2)\n"
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index ca31e8de72..e52bcedce1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.thrift.TAgentTaskRequest;
 import org.apache.doris.thrift.TBackend;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TStorageMedium;
@@ -110,7 +111,7 @@ public class AgentTaskTest {
                                                   version, KeysType.AGG_KEYS,
                                                   storageType, 
TStorageMedium.SSD,
                                                   columns, null, 0, latch, 
null,
-                                                  false, 
TTabletType.TABLET_TYPE_DISK);
+                                                  false, 
TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 026a99ef62..43cb8ac7a0 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto";
 
 import "olap_common.proto";
 import "types.proto";
+import "segment_v2.proto";
 
 message ZoneMap {
     required bytes min = 1;
@@ -194,6 +195,7 @@ message TabletSchemaPB {
     optional int32 sequence_col_idx = 10 [default= -1];
     optional SortType sort_type = 11;
     optional int32 sort_col_num = 12;
+    optional segment_v2.CompressionTypePB compression_type = 13;
 }
 
 enum TabletStatePB {
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index aba8cc9580..62855abfe1 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -67,6 +67,35 @@ enum TTabletType {
     TABLET_TYPE_MEMORY = 1
 }
 
+struct TS3StorageParam {
+    1: optional string s3_endpoint
+    2: optional string s3_region
+    3: optional string s3_ak
+    4: optional string s3_sk
+    5: optional i32 s3_max_conn = 50
+    6: optional i32 s3_request_timeout_ms = 3000
+    7: optional i32 s3_conn_timeout_ms = 1000
+    8: optional string root_path
+}
+
+struct TStorageParam {
+    1: required Types.TStorageMedium storage_medium = TStorageMedium.HDD
+    2: required string storage_name = "";
+    3: optional TS3StorageParam s3_storage_param
+}
+
+enum TCompressionType {
+    UNKNOWN_COMPRESSION = 0,
+    DEFAULT_COMPRESSION = 1,
+    NO_COMPRESSION = 2,
+    SNAPPY = 3,
+    LZ4 = 4,
+    LZ4F = 5,
+    ZLIB = 6,
+    ZSTD = 7
+}
+
+
 struct TCreateTabletReq {
     1: required Types.TTabletId tablet_id
     2: required TTabletSchema tablet_schema
@@ -87,6 +116,8 @@ struct TCreateTabletReq {
     12: optional bool is_eco_mode
     13: optional TStorageFormat storage_format
     14: optional TTabletType tablet_type
+    15: optional TStorageParam storage_param
+    16: optional TCompressionType compression_type = TCompressionType.LZ4F
 }
 
 struct TDropTabletReq {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to