This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c2533c8aca9 [enhance](file cache) clear file cache when load failed 
(#59864)
c2533c8aca9 is described below

commit c2533c8aca9b1e5c2771366dffb4ac526f3f1b62
Author: hui lai <[email protected]>
AuthorDate: Wed Jan 21 02:25:43 2026 +0800

    [enhance](file cache) clear file cache when load failed (#59864)
    
    ### What problem does this PR solve?
    
    Clear file cache when load failed to eliminate invalid cache data, which
    can ensure valuable data is cached.
    
    **Note: This pr can not solve the issue that some rowset commit success
    some commit fail or commit transaction fail, and it will be solved in
    other pr.**
---
 be/src/cloud/cloud_rowset_builder.cpp              |   7 +-
 .../test_clear_file_cache_on_load_failure.groovy   | 157 +++++++++++++++++++++
 2 files changed, 163 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 89bad741531..8ef15424a3d 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -31,7 +31,12 @@ CloudRowsetBuilder::CloudRowsetBuilder(CloudStorageEngine& 
engine, const WriteRe
                                        RuntimeProfile* profile)
         : BaseRowsetBuilder(req, profile), _engine(engine) {}
 
-CloudRowsetBuilder::~CloudRowsetBuilder() = default;
+CloudRowsetBuilder::~CloudRowsetBuilder() {
+    // Clear file cache immediately when load fails
+    if (_is_init && _rowset != nullptr && 
_rowset->rowset_meta()->rowset_state() == PREPARED) {
+        _rowset->clear_cache();
+    }
+}
 
 Status CloudRowsetBuilder::init() {
     _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id));
diff --git 
a/regression-test/suites/cloud_p0/cache/test_clear_file_cache_on_load_failure.groovy
 
b/regression-test/suites/cloud_p0/cache/test_clear_file_cache_on_load_failure.groovy
new file mode 100644
index 00000000000..88d89eea0bf
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/test_clear_file_cache_on_load_failure.groovy
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+suite("test_clear_file_cache_on_load_failure", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    // Clear any existing debug points
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    // Helper function to clear file cache on all backends
+    def clearFileCache = { ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+        for (be in backends) {
+            def ip = be[1]
+            def port = be[4]
+            clearFileCache(ip, port)
+        }
+        // Wait for async clear to complete
+        sleep(5000)
+    }
+
+    // Helper function to get brpc metrics
+    def getBrpcMetrics = { ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        try {
+            def metrics = new URL(url).text
+            def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+            if (matcher.find()) {
+                return matcher[0][1] as long
+            }
+        } catch (Exception e) {
+            logger.warn("Failed to get brpc metrics from ${ip}:${port}: 
${e.message}")
+        }
+        return 0L
+    }
+
+    // Helper function to get index queue cache size
+    def getIndexQueueSize = { ip, port ->
+        return getBrpcMetrics(ip, port, "file_cache_index_queue_cache_size")
+    }
+
+    // Helper function to get total index queue cache size across all backends
+    def getTotalIndexQueueSize = {
+        def backends = sql """SHOW BACKENDS"""
+        long totalSize = 0
+        for (be in backends) {
+            def ip = be[1]
+            def brpcPort = be[5]
+            def size = getIndexQueueSize(ip, brpcPort)
+            totalSize += size
+            logger.info("BE ${ip}:${brpcPort} index_queue_size = ${size}")
+        }
+        return totalSize
+    }
+
+    // Create test table with file cache enabled
+    def tableName = "test_load_failure_cache"
+    sql """DROP TABLE IF EXISTS ${tableName} FORCE"""
+    sql """
+        CREATE TABLE ${tableName} (
+            k1 INT NOT NULL,
+            v1 VARCHAR(100),
+            v2 INT
+        ) UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "disable_auto_compaction" = "true",
+            "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+
+    try {
+        // Disable auto analyze to avoid internal loads affecting cache size
+        sql """SET GLOBAL enable_auto_analyze = false"""
+
+        // Clear file cache and wait for it to complete
+        clearFileCacheOnAllBackends()
+        sleep(3000)
+
+        // Get initial cache size
+        def initialCacheSize = getTotalIndexQueueSize()
+        logger.info("Initial file cache size: ${initialCacheSize}")
+
+        // First, do a successful load to establish baseline
+        sql """INSERT INTO ${tableName} VALUES (1, 'test1', 100)"""
+        // Wait for cache metrics to update
+        sleep(5000)
+
+        def afterSuccessfulLoadSize = getTotalIndexQueueSize()
+        logger.info("Cache size after successful load: 
${afterSuccessfulLoadSize}")
+        assertTrue(afterSuccessfulLoadSize > initialCacheSize,
+            "Cache should increase after successful load. Initial: 
${initialCacheSize}, After: ${afterSuccessfulLoadSize}")
+
+        // Clear cache again to reset
+        clearFileCacheOnAllBackends()
+        sleep(3000)
+
+        def afterClearSize = getTotalIndexQueueSize()
+        logger.info("Cache size after clear: ${afterClearSize}")
+
+        // Enable debug point to make commit_rowset return error
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadChannel.add_batch.failed")
+
+        // Try to insert data - this should fail due to injection point
+        try {
+            sql """INSERT INTO ${tableName} VALUES (2, 'test2', 200)"""
+        } catch (Exception e) {
+            logger.info("Expected load failure occurred: ${e.message}")
+        }
+
+        // Wait for cleanup to complete and cache metrics to update
+        sleep(5000)
+
+        // Get cache size after failed load
+        def afterFailedLoadSize = getTotalIndexQueueSize()
+        logger.info("Cache size after failed load: ${afterFailedLoadSize}")
+
+        // Verify cache size has not increased
+        assertTrue(afterFailedLoadSize == afterClearSize,
+            "Cache should not increase after failed load. " +
+            "Before: ${afterClearSize}, After: ${afterFailedLoadSize}, " +
+            "Difference: ${afterFailedLoadSize - afterClearSize}")
+
+        logger.info("Test passed: File cache was properly cleared after load 
failure")
+    } finally {
+        sql """DROP TABLE IF EXISTS ${tableName} FORCE"""
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to