This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8688f24b227fe531c04024eaebe48d3a3b645b36 Author: xy720 <[email protected]> AuthorDate: Sat Apr 20 23:02:28 2024 +0800 [enhancement](cloud)avoid transaction too large when dropping partition (#33700) --- cloud/src/meta-service/meta_service_partition.cpp | 47 +++------------------- cloud/test/meta_service_test.cpp | 30 ++------------ .../cloud/datasource/CloudInternalCatalog.java | 12 ++++-- gensrc/proto/cloud.proto | 1 + 4 files changed, 19 insertions(+), 71 deletions(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 2469271a868..8d092d14737 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -600,47 +600,12 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll if (!need_commit) return; // Update table version only when deleting non-empty partitions - if (request->has_db_id()) { - bool need_update_table_version = false; - for (auto part_id : request->partition_ids()) { - std::string partition_ver_key = partition_version_key( - {instance_id, request->db_id(), request->table_id(), part_id}); - std::string ver_val_str; - err = txn->get(partition_ver_key, &ver_val_str); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - code = cast_as<ErrCategory::READ>(err); - ss << "failed to get partition version, table_id=" << request->table_id() - << " partition_id=" << part_id << " key=" << hex(partition_ver_key) - << " err=" << err; - msg = ss.str(); - LOG_WARNING(msg); - return; - } - int64_t part_version = -1; - if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - // may be it is an empty partition - part_version = 1; - } else { - VersionPB pb; - if (!pb.ParseFromString(ver_val_str)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed partition version value"; - LOG_WARNING(msg).tag("partition_id", part_id); - return; - } - part_version = pb.version(); - } - if (part_version > 1) { - need_update_table_version = true; - break; - } - } - if (need_update_table_version) { - std::string ver_key = - table_version_key({instance_id, request->db_id(), request->table_id()}); - txn->atomic_add(ver_key, 1); - LOG_INFO("update table version").tag("ver_key", hex(ver_key)); - } + if (request->has_db_id() && request->has_need_update_table_version() && + request->need_update_table_version()) { + std::string ver_key = + table_version_key({instance_id, request->db_id(), request->table_id()}); + txn->atomic_add(ver_key, 1); + LOG_INFO("update table version").tag("ver_key", hex(ver_key)); } err = txn->commit(); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 60cbf3bda64..b2b203f66e0 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -4494,8 +4494,6 @@ TEST(MetaServiceTest, PartitionRequest) { auto partition_key = recycle_partition_key({instance_id, partition_id}); int64_t val_int = 0; auto tbl_version_key = table_version_key({instance_id, 1, table_id}); - VersionPB version_pb; - auto part_version_key = partition_version_key({instance_id, 1, table_id, partition_id}); std::string val; // ------------Test prepare partition------------ brpc::Controller ctrl; @@ -4687,16 +4685,12 @@ TEST(MetaServiceTest, PartitionRequest) { req.set_table_id(table_id); req.add_index_ids(index_id); req.add_partition_ids(partition_id); + req.set_need_update_table_version(true); // Last state UNKNOWN res.Clear(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); - version_pb.set_version(100); - val = version_pb.SerializeAsString(); - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); - txn->put(part_version_key, val); - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); meta_service->drop_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4713,11 +4707,6 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); - version_pb.set_version(100); - val = version_pb.SerializeAsString(); - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); - txn->put(part_version_key, val); - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::PREPARED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4735,15 +4724,11 @@ TEST(MetaServiceTest, PartitionRequest) { val_int = *reinterpret_cast<const int64_t*>(val.data()); ASSERT_EQ(val_int, 2); // Last state PREPARED but drop an empty partition + req.set_need_update_table_version(false); reset_meta_service(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); - version_pb.set_version(1); - val = version_pb.SerializeAsString(); - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); - txn->put(part_version_key, val); - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::PREPARED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4762,14 +4747,10 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(val_int, 1); // Last state DROPPED reset_meta_service(); + req.set_need_update_table_version(true); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); - version_pb.set_version(100); - val = version_pb.SerializeAsString(); - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); - txn->put(part_version_key, val); - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::DROPPED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4791,11 +4772,6 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->atomic_add(tbl_version_key, 1); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); - version_pb.set_version(100); - val = version_pb.SerializeAsString(); - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); - txn->put(part_version_key, val); - ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::RECYCLING); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index a46fddb2a4b..1cd3dcc0a7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -619,6 +619,7 @@ public class CloudInternalCatalog extends InternalCatalog { long tableId = -1; List<Long> partitionIds = Lists.newArrayList(); Set<Long> indexIds = new HashSet<>(); + boolean needUpdateTableVersion = false; for (Partition partition : partitions) { for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { indexIds.add(index.getId()); @@ -627,6 +628,10 @@ public class CloudInternalCatalog extends InternalCatalog { } } partitionIds.add(partition.getId()); + if (partition.hasData()) { + // Update table version only when deleting non-empty partitions + needUpdateTableVersion = true; + } } CloudPartition partition0 = (CloudPartition) partitions.get(0); @@ -640,7 +645,7 @@ public class CloudInternalCatalog extends InternalCatalog { } try { dropCloudPartition(partition0.getDbId(), tableId, partitionIds, - indexIds.stream().collect(Collectors.toList())); + indexIds.stream().collect(Collectors.toList()), needUpdateTableVersion); } catch (Exception e) { LOG.warn("failed to drop partition {} of table {}, try cnt {}, execption {}", partitionIds, tableId, tryCnt, e); @@ -655,14 +660,15 @@ public class CloudInternalCatalog extends InternalCatalog { } } - private void dropCloudPartition(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds) - throws DdlException { + private void dropCloudPartition(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, + boolean needUpdateTableVersion) throws DdlException { Cloud.PartitionRequest.Builder partitionRequestBuilder = Cloud.PartitionRequest.newBuilder(); partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); partitionRequestBuilder.setTableId(tableId); partitionRequestBuilder.addAllPartitionIds(partitionIds); partitionRequestBuilder.addAllIndexIds(indexIds); + partitionRequestBuilder.setNeedUpdateTableVersion(needUpdateTableVersion); if (dbId > 0) { partitionRequestBuilder.setDbId(dbId); } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 9a5705bc781..91e08368a07 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -902,6 +902,7 @@ message PartitionRequest { repeated int64 index_ids = 4; optional int64 expiration = 5; optional int64 db_id = 6; + optional bool need_update_table_version = 7; } message PartitionResponse { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
