This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f23a594a843 [fix](cluster key) support cloud mode (#40593)
f23a594a843 is described below
commit f23a594a8431ff7ca1219a334c5d7668a6b167a8
Author: meiyi <[email protected]>
AuthorDate: Fri Sep 20 20:59:51 2024 +0800
[fix](cluster key) support cloud mode (#40593)
cluster key support cloud mode
---
be/src/cloud/cloud_base_compaction.cpp | 2 +-
be/src/cloud/cloud_cumulative_compaction.cpp | 2 +-
be/src/cloud/cloud_tablet.cpp | 11 +++++-----
be/src/cloud/cloud_tablet.h | 2 +-
.../org/apache/doris/alter/CloudRollupJobV2.java | 2 +-
.../apache/doris/alter/CloudSchemaChangeJobV2.java | 6 +++++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +-
.../java/org/apache/doris/analysis/KeysDesc.java | 4 ----
.../cloud/datasource/CloudInternalCatalog.java | 13 ++++++++++--
.../trees/plans/commands/info/CreateTableInfo.java | 3 ---
.../unique_with_mow_c_p0/test_schema_change_ck.out | 24 ----------------------
.../cloud_p0/conf/regression-conf-custom.groovy | 5 -----
.../compaction/test_compaction_uniq_keys_ck.groovy | 4 ----
.../test_compaction_uniq_keys_row_store_ck.groovy | 4 ----
...test_compaction_uniq_keys_with_delete_ck.groovy | 4 ----
.../test_vertical_compaction_uniq_keys_ck.groovy | 4 ----
.../test_schema_change_ck.groovy | 4 +++-
17 files changed, 30 insertions(+), 66 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.cpp
b/be/src/cloud/cloud_base_compaction.cpp
index 8cf1131695f..e9753cebe82 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -321,7 +321,7 @@ Status CloudBaseCompaction::modify_rowsets() {
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion,
compaction_type(),
- _stats.merged_rows, initiator, output_rowset_delete_bitmap,
+ _stats.merged_rows, _stats.filtered_rows, initiator,
output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={},
range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 8cee815843c..ea5fa7cc340 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -262,7 +262,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion,
compaction_type(),
- _stats.merged_rows, initiator, output_rowset_delete_bitmap,
+ _stats.merged_rows, _stats.filtered_rows, initiator,
output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction,
tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 576f1da7262..06f7e97e0c4 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -734,7 +734,7 @@ Versions CloudTablet::calc_missed_versions(int64_t
spec_version, Versions existi
Status CloudTablet::calc_delete_bitmap_for_compaction(
const std::vector<RowsetSharedPtr>& input_rowsets, const
RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion, ReaderType compaction_type,
int64_t merged_rows,
- int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
+ int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr&
output_rowset_delete_bitmap,
bool allow_delete_in_cumu_compaction) {
output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
std::set<RowLocation> missed_rows;
@@ -750,11 +750,12 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
if (!allow_delete_in_cumu_compaction) {
if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
tablet_state() == TABLET_RUNNING) {
- if (merged_rows >= 0 && merged_rows != missed_rows_size) {
+ if (merged_rows + filtered_rows >= 0 &&
+ merged_rows + filtered_rows != missed_rows_size) {
std::string err_msg = fmt::format(
- "cumulative compaction: the merged rows({}) is not
equal to missed "
- "rows({}) in rowid conversion, tablet_id: {},
table_id:{}",
- merged_rows, missed_rows_size, tablet_id(),
table_id());
+ "cumulative compaction: the merged rows({}), the
filtered rows({}) is not "
+ "equal to missed rows({}) in rowid conversion,
tablet_id: {}, table_id:{}",
+ merged_rows, filtered_rows, missed_rows_size,
tablet_id(), table_id());
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 2bd1ce47502..53747dc19e2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -176,7 +176,7 @@ public:
const RowsetSharedPtr&
output_rowset,
const RowIdConversion&
rowid_conversion,
ReaderType compaction_type,
int64_t merged_rows,
- int64_t initiator,
+ int64_t filtered_rows, int64_t
initiator,
DeleteBitmapPtr&
output_rowset_delete_bitmap,
bool
allow_delete_in_cumu_compaction);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 57143ed47d7..9914dfc6529 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -226,7 +226,7 @@ public class CloudRollupJobV2 extends RollupJobV2 {
tbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
tbl.getEnableMowLightDelete(), null,
tbl.rowStorePageSize(),
- tbl.variantEnableFlattenNested());
+ tbl.variantEnableFlattenNested(), null);
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
requestBuilder.setDbId(dbId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 0a59ec4c93c..01e11f6d631 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -236,6 +236,10 @@ public class CloudSchemaChangeJobV2 extends
SchemaChangeJobV2 {
short shadowShortKeyColumnCount =
indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
+ List<Integer> clusterKeyIndexes = null;
+ if (shadowIdxId == tbl.getBaseIndexId() ||
isShadowIndexOfBase(shadowIdxId, tbl)) {
+ clusterKeyIndexes =
OlapTable.getClusterKeyIndexes(shadowSchema);
+ }
int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int shadowSchemaVersion =
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion;
long originIndexId = indexIdMap.get(shadowIdxId);
@@ -267,7 +271,7 @@ public class CloudSchemaChangeJobV2 extends
SchemaChangeJobV2 {
tbl.getEnableMowLightDelete(),
tbl.getInvertedIndexFileStorageFormat(),
tbl.rowStorePageSize(),
- tbl.variantEnableFlattenNested());
+ tbl.variantEnableFlattenNested(),
clusterKeyIndexes);
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
requestBuilder.setDbId(dbId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index c514bf6306e..ef78611ae9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -217,7 +217,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
partitionOriginIndexIdMap.clear();
}
- private boolean isShadowIndexOfBase(long shadowIdxId, OlapTable tbl) {
+ protected boolean isShadowIndexOfBase(long shadowIdxId, OlapTable tbl) {
if
(indexIdToName.get(shadowIdxId).startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX))
{
String shadowIndexName = indexIdToName.get(shadowIdxId);
String indexName = shadowIndexName
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
index 0076ce74de3..563533ae323 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
@@ -19,7 +19,6 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -118,9 +117,6 @@ public class KeysDesc implements Writable {
}
private void analyzeClusterKeys(List<ColumnDef> cols) throws
AnalysisException {
- if (Config.isCloudMode()) {
- throw new AnalysisException("Cluster key is not supported in cloud
mode");
- }
if (type != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("Cluster keys only support unique keys
table");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 38b1fe4851e..d4d57a6acd2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -72,6 +72,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import doris.segment_v2.SegmentV2;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -155,6 +156,11 @@ public class CloudInternalCatalog extends InternalCatalog {
} else {
indexes = Lists.newArrayList();
}
+ List<Integer> clusterKeyIndexes = null;
+ if (indexId == tbl.getBaseIndexId()) {
+ // only base and shadow index need cluster key indexes
+ clusterKeyIndexes = OlapTable.getClusterKeyIndexes(columns);
+ }
Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
List<String> rowStoreColumns =
tbl.getTableProperty().getCopiedRowStoreColumns();
@@ -174,7 +180,7 @@ public class CloudInternalCatalog extends InternalCatalog {
tbl.getEnableMowLightDelete(),
tbl.getInvertedIndexFileStorageFormat(),
tbl.rowStorePageSize(),
- tbl.variantEnableFlattenNested());
+ tbl.variantEnableFlattenNested(), clusterKeyIndexes);
requestBuilder.addTabletMetas(builder);
}
if (!storageVaultIdSet && ((CloudEnv)
Env.getCurrentEnv()).getEnableStorageVault()) {
@@ -224,7 +230,7 @@ public class CloudInternalCatalog extends InternalCatalog {
Long timeSeriesCompactionLevelThreshold, boolean
disableAutoCompaction,
List<Integer> rowStoreColumnUniqueIds, boolean
enableMowLightDelete,
TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat,
long pageSize,
- boolean variantEnableFlattenNested) throws DdlException {
+ boolean variantEnableFlattenNested, List<Integer> clusterKeyIdxes)
throws DdlException {
OlapFile.TabletMetaCloudPB.Builder builder =
OlapFile.TabletMetaCloudPB.newBuilder();
builder.setTableId(tableId);
builder.setIndexId(indexId);
@@ -352,6 +358,9 @@ public class CloudInternalCatalog extends InternalCatalog {
}
schemaBuilder.setRowStorePageSize(pageSize);
schemaBuilder.setEnableVariantFlattenNested(variantEnableFlattenNested);
+ if (!CollectionUtils.isEmpty(clusterKeyIdxes)) {
+ schemaBuilder.addAllClusterKeyIdxes(clusterKeyIdxes);
+ }
OlapFile.TabletSchemaCloudPB schema = schemaBuilder.build();
builder.setSchema(schema);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 04ce3786bb6..9022e9deb7d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -770,9 +770,6 @@ public class CreateTableInfo {
if (!clusterKeysColumnNames.isEmpty()) {
// the same code as KeysDesc#analyzeClusterKeys
- if (Config.isCloudMode()) {
- throw new AnalysisException("Cluster key is not supported in
cloud mode");
- }
if (keysType != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("Cluster keys only support unique
keys table");
}
diff --git
a/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
index 50028960ab1..601d1c83701 100644
--- a/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
+++ b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
@@ -232,30 +232,6 @@
-- !select_add_partition --
10011 200 38 21
10010 200 39 20
-11 \N 38 28
-10 \N 39 29
-13 \N 36 27
-12 \N 37 26
-15 \N 34 20
-14 \N 35 20
-17 \N 32 20
-16 \N 33 20
-19 200 30 20
-18 200 31 20
-119 200 30 20
-118 200 31 20
-117 200 32 20
-116 200 33 20
-115 200 34 25
-114 200 35 24
-113 200 36 23
-112 200 37 22
-111 200 38 21
-110 200 39 20
-211 200 38 21
-210 200 39 20
-311 200 38 21
-310 200 39 20
-- !select_truncate --
13 \N 36 27
diff --git
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 8d6b265a54b..30649c4e0ee 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -25,13 +25,9 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + //
keep this line as th
"mv_contain_external_table," + // run on external pipeline
"set_replica_status," + // not a case for cloud mode, no need to run
"test_be_inject_publish_txn_fail," + // not a case for cloud mode, no need
to run
- "test_compaction_uniq_cluster_keys_with_delete," +
- "test_compaction_uniq_keys_cluster_key," +
"test_dump_image," +
"test_index_failure_injection," +
"test_information_schema_external," +
- "test_pk_uk_case_cluster," +
- "test_point_query_cluster_key," +
"test_profile," +
"test_publish_timeout," +
"test_refresh_mtmv," + // not supported yet
@@ -55,7 +51,6 @@ excludeDirectories = "000_the_start_sentinel_do_not_touch," +
// keep this line
"cloud_p0/cache," +
"workload_manager_p1," +
"nereids_rules_p0/subquery," +
- "unique_with_mow_c_p0," +
"backup_restore," + // not a case for cloud mode, no need to run
"cold_heat_separation," +
"storage_medium_p0," +
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
index bedc0f8ee1b..b65557b059c 100644
--- a/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
+++ b/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
@@ -18,10 +18,6 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_compaction_uniq_keys_ck") {
- if (isCloudMode()) {
- logger.info("cloud does not support mow cluster key")
- return
- }
def tableName = "compaction_uniq_keys_ck"
try {
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
index bf4e8dc1a51..73f2f069ca9 100644
---
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
+++
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
@@ -19,10 +19,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_compaction_uniq_keys_row_store_ck", "p0") {
- if (isCloudMode()) {
- logger.info("cloud does not support mow cluster key")
- return
- }
def realDb = "regression_test_serving_p0"
def tableName = realDb + ".test_compaction_uniq_keys_row_store_ck"
sql "CREATE DATABASE IF NOT EXISTS ${realDb}"
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
index a21d73b7f26..21af1a92207 100644
---
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
+++
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
@@ -18,10 +18,6 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_compaction_uniq_keys_with_delete_ck") {
- if (isCloudMode()) {
- logger.info("cloud does not support mow cluster key")
- return
- }
def tableName = "test_compaction_uniq_keys_with_delete_ck"
try {
diff --git
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
index 22c8dc9f84d..66f9274d9d4 100644
---
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
+++
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
@@ -18,10 +18,6 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_vertical_compaction_uniq_keys_ck") {
- if (isCloudMode()) {
- logger.info("cloud does not support mow cluster key")
- return
- }
def tableName = "test_vertical_compaction_uniq_keys_ck"
try {
diff --git
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
index 840badb6310..2a6729e84af 100644
--- a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
+++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
@@ -30,6 +30,7 @@ suite("test_schema_change_ck") {
}
sql """ DROP TABLE IF EXISTS ${tableName} """
+ if (!isCloudMode()) {
test {
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
@@ -47,6 +48,7 @@ suite("test_schema_change_ck") {
"""
exception "Unique merge-on-write table with cluster keys must enable
light schema change"
}
+ }
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`c1` int(11) NULL,
@@ -171,7 +173,7 @@ suite("test_schema_change_ck") {
assertEquals(partitions.size(), 2)
}
sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (10011, 21, 38,
200), (10010, 20, 39, 200) """
- qt_select_add_partition """select * from ${tableName}"""
+ qt_select_add_partition """select * from ${tableName} partition
(p_20000)"""
/****** one sql contain multi column changes ******/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]