This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new fefeaf0dc7a branch-3.1: [fix](sc) Skip empty rowset version hole 
filling for versions <= alter_version #56209 (#56212)
fefeaf0dc7a is described below

commit fefeaf0dc7af8644a3d29ead296bd7dff564638e
Author: Xin Liao <[email protected]>
AuthorDate: Fri Sep 19 15:27:03 2025 +0800

    branch-3.1: [fix](sc) Skip empty rowset version hole filling for versions 
<= alter_version #56209 (#56212)
    
    cherry pick from #56209
---
 be/src/cloud/cloud_delta_writer.cpp                |   5 +++
 be/src/cloud/cloud_meta_mgr.cpp                    |  37 +++++++++++++++++++--
 be/src/cloud/cloud_schema_change_job.cpp           |  13 +++++++-
 .../test_schema_change_with_empty_rowset.out       | Bin 0 -> 116 bytes
 ...test_schema_change_mow_with_empty_rowset.groovy |   1 +
 ...=> test_schema_change_with_empty_rowset.groovy} |  23 +++++++++----
 6 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/be/src/cloud/cloud_delta_writer.cpp 
b/be/src/cloud/cloud_delta_writer.cpp
index e0f9d203750..0b3818190da 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -26,6 +26,9 @@
 
 namespace doris {
 
+bvar::Adder<int64_t> g_cloud_commit_rowset_count("cloud_commit_rowset_count");
+bvar::Adder<int64_t> 
g_cloud_commit_empty_rowset_count("cloud_commit_empty_rowset_count");
+
 CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const 
WriteRequest& req,
                                    RuntimeProfile* profile, const UniqueId& 
load_id)
         : BaseDeltaWriter(req, profile, load_id), _engine(engine) {
@@ -108,10 +111,12 @@ const RowsetMetaSharedPtr& 
CloudDeltaWriter::rowset_meta() {
 }
 
 Status CloudDeltaWriter::commit_rowset() {
+    g_cloud_commit_rowset_count << 1;
     std::lock_guard<bthread::Mutex> lock(_mtx);
 
     // Handle empty rowset (no data written)
     if (!_is_init) {
+        g_cloud_commit_empty_rowset_count << 1;
         return _commit_empty_rowset();
     }
 
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 1364cab47fb..5be30dcac79 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1706,28 +1706,61 @@ Status CloudMetaMgr::fill_version_holes(CloudTablet* 
tablet, int64_t max_version
                   return a.first < b.first;
               });
 
+    // During schema change, get_tablet operations on new tablets trigger 
sync_tablet_rowsets which calls
+    // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), 
we selectively skip hole
+    // filling for versions <= alter_version to prevent:
+    // 1. Abnormal compaction score calculations for schema change tablets
+    // 2. Unexpected -235 errors during load operations
+    // This allows schema change to proceed normally while still permitting 
hole filling for versions
+    // beyond the alter_version threshold.
+    bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY;
+    if (is_schema_change_tablet && tablet->alter_version() <= 1) {
+        LOG(INFO) << "Skip version hole filling for new schema change tablet "
+                  << tablet->tablet_id() << " with alter_version " << 
tablet->alter_version();
+        return Status::OK();
+    }
+
     int64_t last_version = -1;
     for (const Version& version : existing_versions) {
+        VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() 
<< ": ["
+                    << version.first << ", " << version.second << "]";
         // missing versions are those that are not in the existing_versions
         if (version.first > last_version + 1) {
             // there is a hole between versions
             auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
             for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
+                // Skip hole filling for versions <= alter_version during 
schema change
+                if (is_schema_change_tablet && ver <= tablet->alter_version()) 
{
+                    continue;
+                }
                 RowsetSharedPtr hole_rowset;
                 RETURN_IF_ERROR(create_empty_rowset_for_hole(
                         tablet, ver, prev_non_hole_rowset->rowset_meta(), 
&hole_rowset));
                 hole_rowsets.push_back(hole_rowset);
             }
             LOG(INFO) << "Created empty rowset for version hole, from " << 
last_version + 1
-                      << " to " << version.first - 1 << " for tablet " << 
tablet->tablet_id();
+                      << " to " << version.first - 1 << " for tablet " << 
tablet->tablet_id()
+                      << (is_schema_change_tablet
+                                  ? (", schema change tablet skipped filling 
versions <= " +
+                                     std::to_string(tablet->alter_version()))
+                                  : "");
         }
         last_version = version.second;
     }
 
     if (last_version + 1 <= max_version) {
         LOG(INFO) << "Created empty rowset for version hole, from " << 
last_version + 1 << " to "
-                  << max_version << " for tablet " << tablet->tablet_id();
+                  << max_version << " for tablet " << tablet->tablet_id()
+                  << (is_schema_change_tablet
+                              ? (", schema change tablet skipped filling 
versions <= " +
+                                 std::to_string(tablet->alter_version()))
+                              : "");
+        // there is a hole after the last existing version
         for (; last_version + 1 <= max_version; ++last_version) {
+            // Skip hole filling for versions <= alter_version during schema 
change
+            if (is_schema_change_tablet && last_version + 1 <= 
tablet->alter_version()) {
+                continue;
+            }
             RowsetSharedPtr hole_rowset;
             auto prev_non_hole_rowset = 
tablet->get_rowset_by_version(existing_versions.back());
             RETURN_IF_ERROR(create_empty_rowset_for_hole(
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index ec4c8224050..9b629c0b038 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/cloud.pb.h>
 
+#include <algorithm>
 #include <chrono>
 #include <memory>
 #include <random>
@@ -459,12 +460,22 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
 initiator));
     TabletMetaSharedPtr tmp_meta = 
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
     tmp_meta->delete_bitmap()->delete_bitmap.clear();
-    tmp_meta->clear_rowsets();
+    // Keep only version [0-1] rowset, other rowsets will be added in 
_output_rowsets
+    auto& rs_metas = tmp_meta->all_mutable_rs_metas();
+    rs_metas.erase(std::remove_if(rs_metas.begin(), rs_metas.end(),
+                                  [](const RowsetMetaSharedPtr& rs_meta) {
+                                      return !(rs_meta->version().first == 0 &&
+                                               rs_meta->version().second == 1);
+                                  }),
+                   rs_metas.end());
+
     std::shared_ptr<CloudTablet> tmp_tablet =
             std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
     {
         std::unique_lock wlock(tmp_tablet->get_header_lock());
         tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+        // Set alter version to let the tmp_tablet can fill hole rowset 
greater than alter_version
+        tmp_tablet->set_alter_version(alter_version);
     }
 
     // step 1, process incremental rowset without delete bitmap update lock
diff --git 
a/regression-test/data/schema_change_p0/test_schema_change_with_empty_rowset.out
 
b/regression-test/data/schema_change_p0/test_schema_change_with_empty_rowset.out
new file mode 100644
index 00000000000..e9555c3d5d7
Binary files /dev/null and 
b/regression-test/data/schema_change_p0/test_schema_change_with_empty_rowset.out
 differ
diff --git 
a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
 
b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
index a0fe1d58321..d7bcf69b3d5 100644
--- 
a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
+++ 
b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
@@ -62,6 +62,7 @@ suite("test_schema_change_mow_with_empty_rowset", "p0") {
     for (int i = 0; i < 20; i++) {
         sql """ insert into ${tableName} values (100, 2, 3, 4, 5, 6.6, 1.7, 
8.8,
     'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+        sleep(20)
     }   
 
     Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, 
TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
diff --git 
a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
 
b/regression-test/suites/schema_change_p0/test_schema_change_with_empty_rowset.groovy
similarity index 80%
copy from 
regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
copy to 
regression-test/suites/schema_change_p0/test_schema_change_with_empty_rowset.groovy
index a0fe1d58321..4f1eaebde6c 100644
--- 
a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
+++ 
b/regression-test/suites/schema_change_p0/test_schema_change_with_empty_rowset.groovy
@@ -18,8 +18,13 @@
 import java.util.concurrent.TimeUnit
 import org.awaitility.Awaitility
 
-suite("test_schema_change_mow_with_empty_rowset", "p0") {
-    def tableName = "test_sc_mow_with_empty_rowset"
+suite("test_schema_change_with_empty_rowset", "p0,nonConcurrent") {
+    def custoBeConfig = [
+        max_tablet_version_num : 100
+    ]
+
+    setBeConfigTemporary(custoBeConfig) {
+    def tableName = "test_sc_with_empty_rowset"
 
     def getJobState = { tbl ->
         def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1 """
@@ -44,7 +49,7 @@ suite("test_schema_change_mow_with_empty_rowset", "p0") {
       `k12` date NULL,
       `k13` datetime NULL
     ) ENGINE=OLAP
-    unique KEY(k1, k2, k3)
+    UNIQUE KEY(k1, k2, k3)
     DISTRIBUTED BY HASH(`k1`) BUCKETS 2
     PROPERTIES (
         "replication_allocation" = "tag.location.default: 1",
@@ -57,11 +62,16 @@ suite("test_schema_change_mow_with_empty_rowset", "p0") {
     'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
     }   
 
+
+    // trigger compactions for all tablets in ${tableName}
+    trigger_and_wait_compaction(tableName, "cumulative")
+
     sql """ alter table ${tableName} modify column k4 string NULL"""
 
-    for (int i = 0; i < 20; i++) {
-        sql """ insert into ${tableName} values (100, 2, 3, 4, 5, 6.6, 1.7, 
8.8,
+    for (int i = 100; i < 120; i++) {
+        sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8,
     'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+        sleep(20)
     }   
 
     Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, 
TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
@@ -75,6 +85,7 @@ suite("test_schema_change_mow_with_empty_rowset", "p0") {
         }
     )
 
-    qt_sql """ select * from ${tableName} order by k1, k2, k3 """
+    qt_sql """ select sum(k1), sum(k2) from ${tableName} """
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to