This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8688f24b227fe531c04024eaebe48d3a3b645b36
Author: xy720 <[email protected]>
AuthorDate: Sat Apr 20 23:02:28 2024 +0800

    [enhancement](cloud)avoid transaction too large when dropping partition 
(#33700)
---
 cloud/src/meta-service/meta_service_partition.cpp  | 47 +++-------------------
 cloud/test/meta_service_test.cpp                   | 30 ++------------
 .../cloud/datasource/CloudInternalCatalog.java     | 12 ++++--
 gensrc/proto/cloud.proto                           |  1 +
 4 files changed, 19 insertions(+), 71 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index 2469271a868..8d092d14737 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -600,47 +600,12 @@ void 
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
     if (!need_commit) return;
 
     // Update table version only when deleting non-empty partitions
-    if (request->has_db_id()) {
-        bool need_update_table_version = false;
-        for (auto part_id : request->partition_ids()) {
-            std::string partition_ver_key = partition_version_key(
-                    {instance_id, request->db_id(), request->table_id(), 
part_id});
-            std::string ver_val_str;
-            err = txn->get(partition_ver_key, &ver_val_str);
-            if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
-                code = cast_as<ErrCategory::READ>(err);
-                ss << "failed to get partition version, table_id=" << 
request->table_id()
-                   << " partition_id=" << part_id << " key=" << 
hex(partition_ver_key)
-                   << " err=" << err;
-                msg = ss.str();
-                LOG_WARNING(msg);
-                return;
-            }
-            int64_t part_version = -1;
-            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-                // may be it is an empty partition
-                part_version = 1;
-            } else {
-                VersionPB pb;
-                if (!pb.ParseFromString(ver_val_str)) {
-                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                    msg = "malformed partition version value";
-                    LOG_WARNING(msg).tag("partition_id", part_id);
-                    return;
-                }
-                part_version = pb.version();
-            }
-            if (part_version > 1) {
-                need_update_table_version = true;
-                break;
-            }
-        }
-        if (need_update_table_version) {
-            std::string ver_key =
-                    table_version_key({instance_id, request->db_id(), 
request->table_id()});
-            txn->atomic_add(ver_key, 1);
-            LOG_INFO("update table version").tag("ver_key", hex(ver_key));
-        }
+    if (request->has_db_id() && request->has_need_update_table_version() &&
+        request->need_update_table_version()) {
+        std::string ver_key =
+                table_version_key({instance_id, request->db_id(), 
request->table_id()});
+        txn->atomic_add(ver_key, 1);
+        LOG_INFO("update table version").tag("ver_key", hex(ver_key));
     }
 
     err = txn->commit();
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 60cbf3bda64..b2b203f66e0 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -4494,8 +4494,6 @@ TEST(MetaServiceTest, PartitionRequest) {
     auto partition_key = recycle_partition_key({instance_id, partition_id});
     int64_t val_int = 0;
     auto tbl_version_key = table_version_key({instance_id, 1, table_id});
-    VersionPB version_pb;
-    auto part_version_key = partition_version_key({instance_id, 1, table_id, 
partition_id});
     std::string val;
     // ------------Test prepare partition------------
     brpc::Controller ctrl;
@@ -4687,16 +4685,12 @@ TEST(MetaServiceTest, PartitionRequest) {
     req.set_table_id(table_id);
     req.add_index_ids(index_id);
     req.add_partition_ids(partition_id);
+    req.set_need_update_table_version(true);
     // Last state UNKNOWN
     res.Clear();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->atomic_add(tbl_version_key, 1);
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
-    version_pb.set_version(100);
-    val = version_pb.SerializeAsString();
-    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
-    txn->put(part_version_key, val);
-    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     meta_service->drop_partition(&ctrl, &req, &res, nullptr);
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
@@ -4713,11 +4707,6 @@ TEST(MetaServiceTest, PartitionRequest) {
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->atomic_add(tbl_version_key, 1);
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
-    version_pb.set_version(100);
-    val = version_pb.SerializeAsString();
-    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
-    txn->put(part_version_key, val);
-    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     partition_pb.set_state(RecyclePartitionPB::PREPARED);
     val = partition_pb.SerializeAsString();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
@@ -4735,15 +4724,11 @@ TEST(MetaServiceTest, PartitionRequest) {
     val_int = *reinterpret_cast<const int64_t*>(val.data());
     ASSERT_EQ(val_int, 2);
     // Last state PREPARED but drop an empty partition
+    req.set_need_update_table_version(false);
     reset_meta_service();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->atomic_add(tbl_version_key, 1);
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
-    version_pb.set_version(1);
-    val = version_pb.SerializeAsString();
-    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
-    txn->put(part_version_key, val);
-    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     partition_pb.set_state(RecyclePartitionPB::PREPARED);
     val = partition_pb.SerializeAsString();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
@@ -4762,14 +4747,10 @@ TEST(MetaServiceTest, PartitionRequest) {
     ASSERT_EQ(val_int, 1);
     // Last state DROPPED
     reset_meta_service();
+    req.set_need_update_table_version(true);
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->atomic_add(tbl_version_key, 1);
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
-    version_pb.set_version(100);
-    val = version_pb.SerializeAsString();
-    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
-    txn->put(part_version_key, val);
-    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     partition_pb.set_state(RecyclePartitionPB::DROPPED);
     val = partition_pb.SerializeAsString();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
@@ -4791,11 +4772,6 @@ TEST(MetaServiceTest, PartitionRequest) {
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     txn->atomic_add(tbl_version_key, 1);
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
-    version_pb.set_version(100);
-    val = version_pb.SerializeAsString();
-    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
-    txn->put(part_version_key, val);
-    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
     partition_pb.set_state(RecyclePartitionPB::RECYCLING);
     val = partition_pb.SerializeAsString();
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index a46fddb2a4b..1cd3dcc0a7d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -619,6 +619,7 @@ public class CloudInternalCatalog extends InternalCatalog {
         long tableId = -1;
         List<Long> partitionIds = Lists.newArrayList();
         Set<Long> indexIds = new HashSet<>();
+        boolean needUpdateTableVersion = false;
         for (Partition partition : partitions) {
             for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                 indexIds.add(index.getId());
@@ -627,6 +628,10 @@ public class CloudInternalCatalog extends InternalCatalog {
                 }
             }
             partitionIds.add(partition.getId());
+            if (partition.hasData()) {
+                // Update table version only when deleting non-empty partitions
+                needUpdateTableVersion = true;
+            }
         }
 
         CloudPartition partition0 = (CloudPartition) partitions.get(0);
@@ -640,7 +645,7 @@ public class CloudInternalCatalog extends InternalCatalog {
             }
             try {
                 dropCloudPartition(partition0.getDbId(), tableId, partitionIds,
-                        indexIds.stream().collect(Collectors.toList()));
+                        indexIds.stream().collect(Collectors.toList()), 
needUpdateTableVersion);
             } catch (Exception e) {
                 LOG.warn("failed to drop partition {} of table {}, try cnt {}, 
execption {}",
                         partitionIds, tableId, tryCnt, e);
@@ -655,14 +660,15 @@ public class CloudInternalCatalog extends InternalCatalog 
{
         }
     }
 
-    private void dropCloudPartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
-            throws DdlException {
+    private void dropCloudPartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
+                                    boolean needUpdateTableVersion) throws 
DdlException {
         Cloud.PartitionRequest.Builder partitionRequestBuilder =
                 Cloud.PartitionRequest.newBuilder();
         partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
         partitionRequestBuilder.setTableId(tableId);
         partitionRequestBuilder.addAllPartitionIds(partitionIds);
         partitionRequestBuilder.addAllIndexIds(indexIds);
+        
partitionRequestBuilder.setNeedUpdateTableVersion(needUpdateTableVersion);
         if (dbId > 0) {
             partitionRequestBuilder.setDbId(dbId);
         }
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 9a5705bc781..91e08368a07 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -902,6 +902,7 @@ message PartitionRequest {
     repeated int64 index_ids = 4;
     optional int64 expiration = 5;
     optional int64 db_id = 6;
+    optional bool need_update_table_version = 7;
 }
 
 message PartitionResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to