This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit f21002f767f0ce52e81e57984ca6c84f8b6fc884 Author: Lightman <[email protected]> AuthorDate: Fri May 27 21:52:05 2022 +0800 [feature] Support compression prop (#8923) --- be/src/olap/rowset/segment_v2/column_writer.cpp | 4 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 10 +++-- be/src/olap/rowset/segment_v2/segment_writer.h | 3 +- be/src/olap/tablet_meta.cpp | 43 ++++++++++++++++++---- be/src/olap/tablet_meta.h | 3 +- be/src/olap/tablet_schema.cpp | 2 + be/src/olap/tablet_schema.h | 3 ++ .../java/org/apache/doris/alter/RollupJobV2.java | 3 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 3 +- .../java/org/apache/doris/backup/RestoreJob.java | 3 +- .../java/org/apache/doris/catalog/Catalog.java | 29 +++++++++++++-- .../java/org/apache/doris/catalog/OlapTable.java | 16 ++++++++ .../org/apache/doris/catalog/TableProperty.java | 16 +++++++- .../apache/doris/common/util/PropertyAnalyzer.java | 32 ++++++++++++++++ .../org/apache/doris/master/ReportHandler.java | 3 +- .../org/apache/doris/task/CreateReplicaTask.java | 10 ++++- .../org/apache/doris/catalog/CreateTableTest.java | 10 +++++ .../java/org/apache/doris/task/AgentTaskTest.java | 3 +- gensrc/proto/olap_file.proto | 2 + gensrc/thrift/AgentService.thrift | 31 ++++++++++++++++ 20 files changed, 202 insertions(+), 27 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index bcf2b2a338..65696a816f 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -120,7 +120,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* length_options.meta->set_is_nullable(false); length_options.meta->set_length(get_scalar_type_info<OLAP_FIELD_TYPE_UNSIGNED_INT>()->size()); length_options.meta->set_encoding(DEFAULT_ENCODING); - length_options.meta->set_compression(LZ4F); + length_options.meta->set_compression(opts.meta->compression()); length_options.need_zone_map = false; length_options.need_bloom_filter = false; @@ -147,7 +147,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* null_options.meta->set_is_nullable(false); null_options.meta->set_length(get_scalar_type_info<OLAP_FIELD_TYPE_TINYINT>()->size()); null_options.meta->set_encoding(DEFAULT_ENCODING); - null_options.meta->set_compression(LZ4F); + null_options.meta->set_compression(opts.meta->compression()); null_options.need_zone_map = false; null_options.need_bloom_filter = false; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 669c770655..896c5e793c 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -51,17 +51,19 @@ SegmentWriter::~SegmentWriter() { }; void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, - const TabletColumn& column) { + const TabletColumn& column, + const TabletSchema* tablet_schema) { // TODO(zc): Do we need this column_id?? meta->set_column_id((*column_id)++); meta->set_unique_id(column.unique_id()); meta->set_type(column.type()); meta->set_length(column.length()); meta->set_encoding(DEFAULT_ENCODING); - meta->set_compression(LZ4F); + meta->set_compression(tablet_schema->compression_type()); meta->set_is_nullable(column.is_nullable()); for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { - init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); + init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i), + tablet_schema); } } @@ -72,7 +74,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused)) ColumnWriterOptions opts; opts.meta = _footer.add_columns(); - init_column_meta(opts.meta, &column_id, column); + init_column_meta(opts.meta, &column_id, column, _tablet_schema); // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS // and not support zone map for array type. diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index ebba30fe70..ad3b608dd6 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -68,7 +68,8 @@ public: Status finalize(uint64_t* segment_file_size, uint64_t* index_size); - static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, const TabletColumn& column); + static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id, + const TabletColumn& column, const TabletSchema* tablet_schema); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 5ad3aa71a7..c4696d10c8 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -43,7 +43,7 @@ OLAPStatus TabletMeta::create(const TCreateTabletReq& request, const TabletUid& request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id, col_ordinal_to_unique_id, tablet_uid, request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK, - request.storage_medium)); + request.storage_medium, request.compression_type)); return OLAP_SUCCESS; } @@ -54,7 +54,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletUid tablet_uid, TTabletType::type tabletType, - TStorageMedium::type t_storage_medium) + TStorageMedium::type t_storage_medium, + TCompressionType::type compression_type) : _tablet_uid(0, 0), _schema(new TabletSchema) { TabletMetaPB tablet_meta_pb; tablet_meta_pb.set_table_id(table_id); @@ -88,14 +89,40 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id LOG(WARNING) << "unknown tablet keys type"; break; } + // compress_kind used to compress segment files schema->set_compress_kind(COMPRESS_LZ4); - switch(tablet_schema.sort_type) { - case TSortType::type::ZORDER: - schema->set_sort_type(SortType::ZORDER); - break; - default: - schema->set_sort_type(SortType::LEXICAL); + // compression_type used to compress segment page + switch (compression_type) { + case TCompressionType::NO_COMPRESSION: + schema->set_compression_type(NO_COMPRESSION); + break; + case TCompressionType::SNAPPY: + schema->set_compression_type(SNAPPY); + break; + case TCompressionType::LZ4: + schema->set_compression_type(LZ4); + break; + case TCompressionType::LZ4F: + schema->set_compression_type(LZ4F); + break; + case TCompressionType::ZLIB: + schema->set_compression_type(ZLIB); + break; + case TCompressionType::ZSTD: + schema->set_compression_type(ZSTD); + break; + default: + schema->set_compression_type(LZ4F); + break; + } + + switch (tablet_schema.sort_type) { + case TSortType::type::ZORDER: + schema->set_sort_type(SortType::ZORDER); + break; + default: + schema->set_sort_type(SortType::LEXICAL); } schema->set_sort_col_num(tablet_schema.sort_col_num); tablet_meta_pb.set_in_restore_mode(false); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index b43e7d0727..5654fee29e 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -83,7 +83,8 @@ public: uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletUid tablet_uid, TTabletType::type tabletType, - TStorageMedium::type t_storage_medium); + TStorageMedium::type t_storage_medium, + TCompressionType::type compression_type); // If need add a filed in TableMeta, filed init copy in copy construct function TabletMeta(const TabletMeta& tablet_meta); TabletMeta(TabletMeta&& tablet_meta) = delete; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 782ae1a7ac..8f67d7f512 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -426,6 +426,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _sequence_col_idx = schema.sequence_col_idx(); _sort_type = schema.sort_type(); _sort_col_num = schema.sort_col_num(); + _compression_type = schema.compression_type(); } void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) { @@ -446,6 +447,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) { tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx); tablet_meta_pb->set_sort_type(_sort_type); tablet_meta_pb->set_sort_col_num(_sort_col_num); + tablet_meta_pb->set_compression_type(_compression_type); } uint32_t TabletSchema::mem_size() const { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index f9bd519cc1..4fbdf3b2e0 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -21,6 +21,7 @@ #include <vector> #include "gen_cpp/olap_file.pb.h" +#include "gen_cpp/segment_v2.pb.h" #include "olap/olap_define.h" #include "olap/types.h" @@ -143,6 +144,7 @@ public: inline void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; } inline bool has_sequence_col() const { return _sequence_col_idx != -1; } inline int32_t sequence_col_idx() const { return _sequence_col_idx; } + inline segment_v2::CompressionTypePB compression_type() const { return _compression_type; } vectorized::Block create_block(const std::vector<uint32_t>& return_columns, const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const; @@ -165,6 +167,7 @@ private: size_t _num_short_key_columns = 0; size_t _num_rows_per_row_block = 0; CompressKind _compress_kind = COMPRESS_NONE; + segment_v2::CompressionTypePB _compression_type = segment_v2::CompressionTypePB::LZ4F; size_t _next_column_unique_id = 0; bool _has_bf_fpp = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index a328fb0400..6c1ea1148d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -232,7 +232,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch, tbl.getCopiedIndexes(), tbl.isInMemory(), - tabletType); + tabletType, + tbl.getCompressionType()); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 856df527ea..897ac2cd4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -254,7 +254,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { originKeysType, TStorageType.COLUMN, storageMedium, shadowSchema, bfColumns, bfFpp, countDownLatch, indexes, tbl.isInMemory(), - tbl.getPartitionInfo().getTabletType(partitionId)); + tbl.getPartitionInfo().getTabletType(partitionId), + tbl.getCompressionType()); createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index f29ab14be1..b30e13d52f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -963,7 +963,8 @@ public class RestoreJob extends AbstractJob { indexMeta.getSchema(), bfColumns, bfFpp, null, localTbl.getCopiedIndexes(), localTbl.isInMemory(), - localTbl.getPartitionInfo().getTabletType(restorePart.getId())); + localTbl.getPartitionInfo().getTabletType(restorePart.getId()), + localTbl.getCompressionType()); task.setInRestoreMode(true); batchTask.addTask(task); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index bf04888bf3..7685327ef1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -241,6 +241,7 @@ import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -3003,6 +3004,7 @@ public class Catalog { * 6.2. replicationNum * 6.3. inMemory * 6.4. storageFormat + * 6.5. compressionType * 7. set index meta * 8. check colocation properties * 9. create tablet in BE @@ -3281,6 +3283,7 @@ public class Catalog { singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(), singlePartitionDesc.getTabletType(), + olapTable.getCompressionType(), olapTable.getDataSortInfo() ); @@ -3512,6 +3515,7 @@ public class Catalog { boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, + TCompressionType compressionType, DataSortInfo dataSortInfo) throws DdlException { // create base index first. Preconditions.checkArgument(baseIndexId != -1); @@ -3579,7 +3583,8 @@ public class Catalog { indexes, isInMemory, tabletType, - dataSortInfo); + dataSortInfo, + compressionType); task.setStorageFormat(storageFormat); batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. @@ -3697,6 +3702,15 @@ public class Catalog { } olapTable.setStorageFormat(storageFormat); + // get compression type + TCompressionType compressionType = TCompressionType.LZ4; + try { + compressionType = PropertyAnalyzer.analyzeCompressionType(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setCompressionType(compressionType); + // check data sort properties DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType, keysDesc.keysColumnSize(), storageFormat); @@ -3741,6 +3755,7 @@ public class Catalog { throw new DdlException(e.getMessage()); } + if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { // if this is an unpartitioned table, we should analyze data property and replication num here. // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase. @@ -3877,7 +3892,7 @@ public class Catalog { partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), - isInMemory, storageFormat, tabletType, olapTable.getDataSortInfo()); + isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo()); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { try { @@ -3928,7 +3943,8 @@ public class Catalog { versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, - partitionInfo.getTabletType(entry.getValue()), olapTable.getDataSortInfo()); + partitionInfo.getTabletType(entry.getValue()), + compressionType, olapTable.getDataSortInfo()); olapTable.addPartition(partition); } } else { @@ -4272,6 +4288,12 @@ public class Catalog { sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\" = \""); sb.append(olapTable.getStorageFormat()).append("\""); + // compression type + if (olapTable.getCompressionType() != TCompressionType.LZ4F) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = \""); + sb.append(olapTable.getCompressionType()).append("\""); + } + sb.append("\n)"); } else if (table.getType() == TableType.MYSQL) { MysqlTable mysqlTable = (MysqlTable) table; @@ -6800,6 +6822,7 @@ public class Catalog { copiedTbl.isInMemory(), copiedTbl.getStorageFormat(), copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), + copiedTbl.getCompressionType(), copiedTbl.getDataSortInfo()); newPartitions.add(newPartition); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 253d53521e..77e22eb118 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -48,6 +48,7 @@ import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TOlapTable; import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TStorageFormat; @@ -1672,6 +1673,14 @@ public class OlapTable extends Table { return !tempPartitions.isEmpty(); } + public void setCompressionType(TCompressionType compressionType) { + if (tableProperty == null) { + tableProperty = new TableProperty(new HashMap<>()); + } + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPRESSION, compressionType.name()); + tableProperty.buildCompressionType(); + } + public void setStorageFormat(TStorageFormat storageFormat) { if (tableProperty == null) { tableProperty = new TableProperty(new HashMap<>()); @@ -1687,6 +1696,13 @@ public class OlapTable extends Table { return tableProperty.getStorageFormat(); } + public TCompressionType getCompressionType() { + if (tableProperty == null) { + return TCompressionType.LZ4F; + } + return tableProperty.getCompressionType(); + } + public DataSortInfo getDataSortInfo() { if (tableProperty == null) { return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 844e4032c3..91ef9b0c32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.OperationType; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Strings; @@ -68,6 +69,8 @@ public class TableProperty implements Writable { */ private TStorageFormat storageFormat = TStorageFormat.DEFAULT; + private TCompressionType compressionType = TCompressionType.LZ4F; + private DataSortInfo dataSortInfo = new DataSortInfo(); public TableProperty(Map<String, String> properties) { @@ -145,6 +148,12 @@ public class TableProperty implements Writable { return this; } + public TableProperty buildCompressionType() { + compressionType = TCompressionType.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPRESSION, + TCompressionType.LZ4F.name())); + return this; + } + public TableProperty buildStorageFormat() { storageFormat = TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, TStorageFormat.DEFAULT.name())); @@ -210,6 +219,10 @@ public class TableProperty implements Writable { return dataSortInfo; } + public TCompressionType getCompressionType() { + return compressionType; + } + public void buildReplicaAllocation() { try { // Must copy the properties because "analyzeReplicaAllocation" with remove the property @@ -233,7 +246,8 @@ public class TableProperty implements Writable { .executeBuildDynamicProperty() .buildInMemory() .buildStorageFormat() - .buildDataSortInfo(); + .buildDataSortInfo() + .buildCompressionType(); if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index a4684f09c9..c4e186c4e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -30,6 +30,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.resource.Tag; +import org.apache.doris.thrift.TCompressionType; +import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -73,6 +75,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_COLOCATE_WITH = "colocate_with"; public static final String PROPERTIES_TIMEOUT = "timeout"; + public static final String PROPERTIES_COMPRESSION = "compression"; public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type"; public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = "send_clear_alter_tasks"; @@ -384,6 +387,35 @@ public class PropertyAnalyzer { return timeout; } + // analyzeCompressionType will parse the compression type from properties + public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException { + String compressionType = ""; + if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) { + compressionType = properties.get(PROPERTIES_COMPRESSION); + properties.remove(PROPERTIES_COMPRESSION); + } else { + return TCompressionType.LZ4F; + } + + if (compressionType.equalsIgnoreCase("no_compression")) { + return TCompressionType.NO_COMPRESSION; + } else if (compressionType.equalsIgnoreCase("lz4")) { + return TCompressionType.LZ4; + } else if (compressionType.equalsIgnoreCase("lz4f")) { + return TCompressionType.LZ4F; + } else if (compressionType.equalsIgnoreCase("zlib")) { + return TCompressionType.ZLIB; + } else if (compressionType.equalsIgnoreCase("zstd")) { + return TCompressionType.ZSTD; + } else if (compressionType.equalsIgnoreCase("snappy")) { + return TCompressionType.SNAPPY; + } else if (compressionType.equalsIgnoreCase("default_compression")) { + return TCompressionType.LZ4F; + } else { + throw new AnalysisException("unknown compression type: " + compressionType); + } + } + // analyzeStorageFormat will parse the storage format from properties // sql: alter table tablet_name set ("storage_format" = "v2") // Use this sql to convert all tablets(base and rollup index) to a new format segment diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 05643aa7e2..d875fecbe6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -603,7 +603,8 @@ public class ReportHandler extends Daemon { TStorageMedium.HDD, indexMeta.getSchema(), bfColumns, bfFpp, null, olapTable.getCopiedIndexes(), olapTable.isInMemory(), - olapTable.getPartitionInfo().getTabletType(partitionId)); + olapTable.getPartitionInfo().getTabletType(partitionId), + olapTable.getCompressionType()); createReplicaTask.setIsRecoverTask(true); createReplicaBatchTask.addTask(createReplicaTask); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 7ecf25979d..5f59d5ff0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Status; import org.apache.doris.thrift.TColumn; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TCreateTabletReq; import org.apache.doris.thrift.TOlapTableIndex; import org.apache.doris.thrift.TStatusCode; @@ -54,6 +55,7 @@ public class CreateReplicaTask extends AgentTask { private KeysType keysType; private TStorageType storageType; private TStorageMedium storageMedium; + private TCompressionType compressionType; private List<Column> columns; @@ -93,7 +95,7 @@ public class CreateReplicaTask extends AgentTask { Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch, List<Index> indexes, boolean isInMemory, - TTabletType tabletType) { + TTabletType tabletType, TCompressionType compressionType) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.shortKeyColumnCount = shortKeyColumnCount; @@ -104,6 +106,7 @@ public class CreateReplicaTask extends AgentTask { this.keysType = keysType; this.storageType = storageType; this.storageMedium = storageMedium; + this.compressionType = compressionType; this.columns = columns; @@ -125,7 +128,8 @@ public class CreateReplicaTask extends AgentTask { List<Index> indexes, boolean isInMemory, TTabletType tabletType, - DataSortInfo dataSortInfo) { + DataSortInfo dataSortInfo, + TCompressionType compressionType) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.shortKeyColumnCount = shortKeyColumnCount; @@ -136,6 +140,7 @@ public class CreateReplicaTask extends AgentTask { this.keysType = keysType; this.storageType = storageType; this.storageMedium = storageMedium; + this.compressionType = compressionType; this.columns = columns; @@ -267,6 +272,7 @@ public class CreateReplicaTask extends AgentTask { } createTabletReq.setTabletType(tabletType); + createTabletReq.setCompressionType(compressionType); return createTabletReq; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index e4f97607d9..082a05cf40 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -138,6 +138,16 @@ public class CreateTableTest { .expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n" + "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');")); + ExceptionChecker + .expectThrowsNoException(() -> createTable("create table test.compression1(key1 int, key2 varchar(10)) \n" + + "distributed by hash(key1) buckets 1 \n" + + "properties('replication_num' = '1', 'compression' = 'lz4f');")); + + ExceptionChecker + .expectThrowsNoException(() -> createTable("create table test.compression2(key1 int, key2 varchar(10)) \n" + + "distributed by hash(key1) buckets 1 \n" + + "properties('replication_num' = '1', 'compression' = 'snappy');")); + ExceptionChecker .expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n" + "unique key(k1, k2)\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index ca31e8de72..e52bcedce1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.thrift.TAgentTaskRequest; import org.apache.doris.thrift.TBackend; +import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TStorageMedium; @@ -110,7 +111,7 @@ public class AgentTaskTest { version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, - false, TTabletType.TABLET_TYPE_DISK); + false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 026a99ef62..43cb8ac7a0 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto"; import "olap_common.proto"; import "types.proto"; +import "segment_v2.proto"; message ZoneMap { required bytes min = 1; @@ -194,6 +195,7 @@ message TabletSchemaPB { optional int32 sequence_col_idx = 10 [default= -1]; optional SortType sort_type = 11; optional int32 sort_col_num = 12; + optional segment_v2.CompressionTypePB compression_type = 13; } enum TabletStatePB { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index aba8cc9580..62855abfe1 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -67,6 +67,35 @@ enum TTabletType { TABLET_TYPE_MEMORY = 1 } +struct TS3StorageParam { + 1: optional string s3_endpoint + 2: optional string s3_region + 3: optional string s3_ak + 4: optional string s3_sk + 5: optional i32 s3_max_conn = 50 + 6: optional i32 s3_request_timeout_ms = 3000 + 7: optional i32 s3_conn_timeout_ms = 1000 + 8: optional string root_path +} + +struct TStorageParam { + 1: required Types.TStorageMedium storage_medium = TStorageMedium.HDD + 2: required string storage_name = ""; + 3: optional TS3StorageParam s3_storage_param +} + +enum TCompressionType { + UNKNOWN_COMPRESSION = 0, + DEFAULT_COMPRESSION = 1, + NO_COMPRESSION = 2, + SNAPPY = 3, + LZ4 = 4, + LZ4F = 5, + ZLIB = 6, + ZSTD = 7 +} + + struct TCreateTabletReq { 1: required Types.TTabletId tablet_id 2: required TTabletSchema tablet_schema @@ -87,6 +116,8 @@ struct TCreateTabletReq { 12: optional bool is_eco_mode 13: optional TStorageFormat storage_format 14: optional TTabletType tablet_type + 15: optional TStorageParam storage_param + 16: optional TCompressionType compression_type = TCompressionType.LZ4F } struct TDropTabletReq { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
