This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8ad2c857a41 [fix](cloud) avoid NPE and clear stale cache on warmup job 
cancel/expire (#62805) (#63019)
8ad2c857a41 is described below

commit 8ad2c857a41d2520d3623f71d804314e150eb89a
Author: Xin Liao <[email protected]>
AuthorDate: Thu May 7 11:31:41 2026 +0800

    [fix](cloud) avoid NPE and clear stale cache on warmup job cancel/expire 
(#62805) (#63019)
    
    Pick apache/doris#62805
---
 be/src/cloud/cloud_warm_up_manager.cpp             |   2 +-
 .../apache/doris/service/FrontendServiceImpl.java  |   2 +-
 .../doris/service/FrontendServiceImplTest.java     |  41 +++++
 ...est_warm_up_cluster_event_cancel_expired.groovy | 203 +++++++++++++++++++++
 4 files changed, 246 insertions(+), 2 deletions(-)

diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index e609257f74b..ef7a9d5fa94 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -540,7 +540,7 @@ std::vector<TReplicaInfo> 
CloudWarmUpManager::get_replica_info(int64_t tablet_id
 
         auto st = Status::create(result.status);
         if (!st.ok()) {
-            if (st.is<CANCELED>()) {
+            if (st.is<ErrorCode::CANCELLED>()) {
                 LOG(INFO) << "get_replica_info: warm up job cancelled, 
tablet_id=" << tablet_id
                           << ", job_id=" << job_id;
                 cancelled_jobs.push_back(job_id);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f8815f6b750..9a4527827cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2800,7 +2800,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     .getCloudWarmUpJob(request.getWarmUpJobId());
             if (job == null || job.isDone()) {
                 LOG.info("warmup job {} is not running, notify caller BE {} to 
cancel job",
-                        job.getJobId(), clientAddr);
+                        request.getWarmUpJobId(), clientAddr);
                 // notify client to cancel this job
                 result.setStatus(new TStatus(TStatusCode.CANCELLED));
                 return result;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
index 21d6dd9b1b7..aa551584b76 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
@@ -21,6 +21,8 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.CacheHotspotManager;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.PrintableMap;
@@ -40,6 +42,8 @@ import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
 import org.apache.doris.thrift.TFetchSchemaTableDataResult;
 import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
+import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
+import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
 import org.apache.doris.thrift.TMaxComputeBlockIdRequest;
 import org.apache.doris.thrift.TMaxComputeBlockIdResult;
 import org.apache.doris.thrift.TMetadataTableRequestParams;
@@ -59,11 +63,13 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -343,4 +349,39 @@ public class FrontendServiceImplTest {
             
Env.getCurrentEnv().getAuthenticationIntegrationMgr().dropAuthenticationIntegration(integrationName,
 true);
         }
     }
+
+    @Test
+    public void testGetTabletReplicaInfosNullJobReturnsCancelledWithoutNpe() {
+        String originalCloudUniqueId = Config.cloud_unique_id;
+        Config.cloud_unique_id = "gettabletreplicainfostest";
+
+        CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+        CacheHotspotManager cacheHotspotManager = 
Mockito.mock(CacheHotspotManager.class);
+        
Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager);
+        
Mockito.when(cacheHotspotManager.getCloudWarmUpJob(123456L)).thenReturn(null);
+
+        MockedStatic<Env> envMock = Mockito.mockStatic(Env.class);
+        try {
+            envMock.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+
+            FrontendServiceImpl frontendService = new 
FrontendServiceImpl(exeEnv);
+            TGetTabletReplicaInfosRequest request = new 
TGetTabletReplicaInfosRequest();
+            request.setTabletIds(Collections.singletonList(789L));
+            request.setWarmUpJobId(123456L);
+
+            TGetTabletReplicaInfosResult result;
+            try {
+                result = frontendService.getTabletReplicaInfos(request);
+            } catch (NullPointerException e) {
+                throw new AssertionError("getTabletReplicaInfos must not NPE 
when the "
+                        + "warm-up job has been removed from 
CacheHotspotManager", e);
+            }
+
+            Assert.assertNotNull(result.getStatus());
+            Assert.assertEquals(TStatusCode.CANCELLED, 
result.getStatus().getStatusCode());
+        } finally {
+            envMock.close();
+            Config.cloud_unique_id = originalCloudUniqueId;
+        }
+    }
 }
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy
new file mode 100644
index 00000000000..31c28cbf2b7
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_cancel_expired.groovy
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+// Covers a two-part bug on the event-driven warm-up path:
+//   1. BE side (cloud_warm_up_manager.cpp): `st.is<CANCELED>()` used the
+//      proto enum value PCacheStatus::CANCELED=9 instead of
+//      ErrorCode::CANCELLED=1, so BE never cleared a cancelled job from
+//      `_tablet_replica_cache`.
+//   2. FE side (FrontendServiceImpl.getTabletReplicaInfos): when the job
+//      had been removed from `cloudWarmUpJobs` (after
+//      history_cloud_warm_up_job_keep_max_second), the branch still
+//      called `job.getJobId()` on a null reference, throwing NPE to BE.
+//
+// After the fix, once a warm-up job is cancelled BE must drop it from
+// its cache on the next RPC (via the CANCELLED TStatus), so that later
+// expiry removal on FE never receives follow-up requests with the dead
+// job_id and no NPE path is exercised.
+suite('test_warm_up_cluster_event_cancel_expired', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        // Keep expiry small so FE removes the cancelled job quickly.
+        'history_cloud_warm_up_job_keep_max_second=30',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'file_cache_background_monitor_interval_ms=1000',
+    ]
+    options.cloudMode = true
+
+    def clearFileCache = { ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+        for (be in backends) {
+            clearFileCache(be[1], be[4])
+        }
+        sleep(10000)
+    }
+
+    def getBrpcMetrics = { ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        if 
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
 ?: false) {
+            url = url.replace("http://";, "https://";) + " --cert " + 
context.config.otherConfigs.get("trustCert") + " --cacert " + 
context.config.otherConfigs.get("trustCACert") + " --key " + 
context.config.otherConfigs.get("trustCAKey")
+        }
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def getSkippedRowsetSum = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll {
+            it[19].contains("""\"compute_group_name\" : \"${cluster}\"""")
+        }
+        long sum = 0
+        for (be in cluster_bes) {
+            sum += getBrpcMetrics(be[1], be[5], 
"file_cache_event_driven_warm_up_skipped_rowset_num")
+        }
+        return sum
+    }
+
+    docker(options) {
+        def clusterSrc = "warmup_source"
+        def clusterDst = "warmup_target"
+
+        cluster.addBackend(1, clusterSrc)
+        cluster.addBackend(1, clusterDst)
+
+        sql """use @${clusterSrc}"""
+        sql """CREATE TABLE IF NOT EXISTS t_exp (
+                   id INT, v STRING
+               ) DUPLICATE KEY(id)
+                 DISTRIBUTED BY HASH(id) BUCKETS 3
+                 PROPERTIES ("file_cache_ttl_seconds" = "3600")"""
+
+        // 1. Start event-driven LOAD warm-up job; this is the only mode
+        //    that populates `_tablet_replica_cache[jobId]` on BE.
+        def jobRows = sql """
+            WARM UP CLUSTER ${clusterDst} WITH CLUSTER ${clusterSrc}
+            PROPERTIES (
+                "sync_mode"  = "event_driven",
+                "sync_event" = "load"
+            )
+        """
+        def jobId = jobRows[0][0]
+        logger.info("event-driven warm-up jobId=${jobId}")
+        clearFileCacheOnAllBackends()
+        sleep(15000)
+
+        // 2. Drive some loads so BE caches the job_id in _tablet_replica_cache
+        //    (we don't care about the warmed data itself for this test).
+        for (int i = 0; i < 20; i++) {
+            sql """INSERT INTO t_exp VALUES (${i}, 'x')"""
+        }
+        sleep(5000)
+
+        // 3. Cancel the job. After the fix, BE sees TStatus.CANCELLED on
+        //    the next get_replica_info and drops the job from
+        //    _tablet_replica_cache; before the fix (CANCELED typo) it
+        //    would keep the entry.
+        sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+        def st = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals("CANCELLED", st[0][3])
+
+        // 4. One more batch so BE actually sees the CANCELLED status
+        //    and (with the fix) purges its cache entry.
+        for (int i = 0; i < 20; i++) {
+            sql """INSERT INTO t_exp VALUES (${100 + i}, 'y')"""
+        }
+        sleep(5000)
+
+        // 5. Baseline for the skipped-rowset counter. After BE has
+        //    cleaned its cache, subsequent warm_up_rowset calls return
+        //    early (empty replicas -> "skipping rowset") and bump this
+        //    counter on every commit. If the typo is unfixed the counter
+        //    stays flat because BE keeps calling FE.
+        def skippedBaseline = getSkippedRowsetSum(clusterDst)
+        logger.info("skipped_rowset baseline=${skippedBaseline}")
+
+        // 6. Wait past history_cloud_warm_up_job_keep_max_second plus one
+        //    JobDaemon cycle (~20s, CYCLE_COUNT_TO_CHECK_EXPIRE=20 ticks
+        //    at 1s each) so FE removes the cancelled job from its map.
+        logger.info("waiting for FE to expire+remove cancelled job")
+        def removed = false
+        for (int i = 0; i < 60; i++) {
+            def rows = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+            if (rows.isEmpty()) {
+                removed = true
+                logger.info("job ${jobId} removed from FE after ${i}s")
+                break
+            }
+            sleep(1000)
+        }
+        assertTrue(removed, "FE should have removed expired warm-up job")
+
+        // 7. After FE removal, run more loads. Buggy code path:
+        //    BE still has job_id in _tablet_replica_cache -> RPC to FE ->
+        //    FE's LOG.info("...", job.getJobId(), ...) NPE's on
+        //    `job == null` -> BE falls into the 2s thrift exception
+        //    sleep *inside* CloudWarmUpManager::_mtx, serialising every
+        //    commit_rowset and blowing up heavy_work_pool.
+        //
+        //    Fixed code path: BE cache already cleaned in step 4, so
+        //    warm_up_rowset takes the empty-replicas fast path, loads
+        //    stay fast, and FE never gets called with the dead job_id.
+        def t0 = System.currentTimeMillis()
+        for (int i = 0; i < 20; i++) {
+            sql """INSERT INTO t_exp VALUES (${200 + i}, 'z')"""
+        }
+        def elapsedMs = System.currentTimeMillis() - t0
+        logger.info("20 INSERTs after FE removal took ${elapsedMs}ms")
+
+        // 20 INSERTs, each commit would cost ~2s of serialised sleep in
+        // the buggy case (>= 40s). Threshold of 30s is a generous
+        // upper bound that the fixed path comfortably meets but the
+        // buggy path cannot.
+        assertTrue(elapsedMs < 30_000,
+                "post-removal inserts should not be blocked by NPE sleeps, " +
+                "took ${elapsedMs}ms")
+
+        // 8. On the fixed path every commit short-circuits through
+        //    g_file_cache_event_driven_warm_up_skipped_rowset_num.
+        //    We expect it to grow; on the buggy path it would be flat
+        //    since BE never stopped pursuing FE replicas.
+        def skippedAfter = getSkippedRowsetSum(clusterDst)
+        logger.info("skipped_rowset after=${skippedAfter}")
+        assertTrue(skippedAfter > skippedBaseline,
+                "BE should skip warm_up_rowset for tablets after 
cancel+expire, " +
+                "baseline=${skippedBaseline} after=${skippedAfter}")
+
+        sql """DROP TABLE IF EXISTS t_exp"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to