This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new af3cfab6064 branch-4.1: [improvement](fe) Add configurable return mode 
for insert publish timeout in ETL scenarios (#64583)
af3cfab6064 is described below

commit af3cfab6064620c0f0f4b077c6c32d870e7799a4
Author: Wen Zhenghu <[email protected]>
AuthorDate: Sat Jun 20 21:46:00 2026 +0800

    branch-4.1: [improvement](fe) Add configurable return mode for insert 
publish timeout in ETL scenarios (#64583)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #63919
    
    Problem Summary:
    Normal internal-table inserts currently treat publish timeout as a
    committed insert and return success with COMMITTED status. This behavior
    is acceptable when clients only care that the transaction has been
    committed and can tolerate delayed visibility, but it is unsafe for
    pipelines whose downstream steps depend on the inserted data already
    being visible.
    
    A typical case is ETL workflows that first use CREATE TABLE AS SELECT to
    build a temporary table and then immediately read that table to populate
    a downstream result table. If the upstream transaction has been
    committed but is not yet VISIBLE, the downstream step may temporarily
    read no rows and silently write empty data into the final table, so the
    whole pipeline appears successful even though the result is incorrect.
    
    Doris already returns an error in explicit transaction mode when a
    COMMIT statement times out before the transaction becomes visible. This
    change adds a compatible mode for the regular non-transactional
    internal-table insert path by introducing a session variable,
    insert_visible_timeout_return_mode, so users can choose whether publish
    timeout should keep returning COMMITTED or return ERR.
    
    The implementation also keeps committed-side bookkeeping unchanged in
    error mode so finished load jobs, insert result metadata, and related
    accounting still reflect the real transaction state.
    
    This is a branch-4.1 backport of #63919. The code was adapted manually
    because the original patch conflicted with branch-4.1 around
    SessionVariable annotations and tests:
    
    - Keep the original OlapInsertExecutor behavior change.
    - Adapt the new session variable to branch-4.1's VariableMgr.VarAttr
    annotation path.
    - Migrate the FE unit tests and regression test, with the regression SQL
    adjusted to branch-4.1 repository test style.
    
    ### Release note
    
    Add a session variable to control whether normal internal-table inserts
    return COMMITTED or ERR when publish visibility times out.
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [x] Regression test
    - Added
    
regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
    - Not run locally because no dedicated local branch-4.1 FE/BE cluster
    was available. Existing regression config points to an external cluster,
    so it was not used.
        - [x] Unit Test
    - ./run-fe-ut.sh --run
    
org.apache.doris.qe.SessionVariablesTest,org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutorTest
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../plans/commands/insert/OlapInsertExecutor.java  |  16 ++
 .../java/org/apache/doris/qe/SessionVariable.java  |  71 ++++++
 .../commands/insert/OlapInsertExecutorTest.java    | 274 +++++++++++++++++++++
 .../org/apache/doris/qe/SessionVariablesTest.java  |  84 +++++++
 .../test_insert_visible_timeout_return_mode.out    |   5 +
 .../test_preload_external_metadata_profile.groovy  | 195 ---------------
 .../test_insert_visible_timeout_return_mode.groovy |  82 ++++++
 7 files changed, 532 insertions(+), 195 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index e02d94a1014..3771c415c17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -46,6 +46,7 @@ import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
@@ -78,7 +79,12 @@ import java.util.stream.Collectors;
  */
 public class OlapInsertExecutor extends AbstractInsertExecutor {
     private static final Logger LOG = 
LogManager.getLogger(OlapInsertExecutor.class);
+    // Keep the timeout message aligned with the client-facing error returned 
by the legacy insert path.
+    private static final String INSERT_VISIBLE_TIMEOUT_ERROR_MSG = 
"transaction commit successfully, "
+            + "BUT data did not become visible within 
insert_visible_timeout_ms and will be visible later.";
     protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
+    // Track publish timeout separately from real failures so committed 
bookkeeping still runs.
+    protected boolean publishTimedOutAfterCommit = false;
 
     protected OlapTable olapTable;
 
@@ -229,7 +235,9 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 ctx.getSessionVariable().getInsertVisibleTimeoutMs(), 
txnCommitAttachment)) {
             txnStatus = TransactionStatus.VISIBLE;
         } else {
+            // Keep the committed status so load accounting and insert result 
bookkeeping stay aligned.
             txnStatus = TransactionStatus.COMMITTED;
+            publishTimedOutAfterCommit = true;
         }
         if (Config.isCloudMode()) {
             String clusterName = ctx.getCloudCluster();
@@ -364,6 +372,14 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 txnStatus, loadedRows, filteredRows);
         // update it, so that user can get loaded rows in fe.audit.log
         ctx.updateReturnRows((int) loadedRows);
+        if (publishTimedOutAfterCommit && 
ctx.getSessionVariable().isInsertVisibleTimeoutReturnError()) {
+            // Log the committed timeout branch explicitly so operators can 
distinguish it from real failures.
+            LOG.warn("insert [{}] with txn id {} committed but return error 
because {}={}",
+                    labelName, txnId, 
SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE,
+                    SessionVariable.InsertVisibleTimeoutReturnMode.ERROR);
+            // Convert the final client response to ERR after all 
committed-side bookkeeping has finished.
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
INSERT_VISIBLE_TIMEOUT_ERROR_MSG);
+        }
     }
 
     public long getTimeout() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 81feb7242ab..97cd3bbb583 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -266,6 +266,30 @@ public class SessionVariable implements Serializable, 
Writable {
 
     // max ms to wait transaction publish finish when exec insert stmt.
     public static final String INSERT_VISIBLE_TIMEOUT_MS = 
"insert_visible_timeout_ms";
+    public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE = 
"insert_visible_timeout_return_mode";
+    public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED = 
"committed";
+    public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR = 
"error";
+
+    // Keep the mode enum for business logic while storing the session value 
as a string.
+    public enum InsertVisibleTimeoutReturnMode {
+        COMMITTED(INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED),
+        ERROR(INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR);
+
+        private final String option;
+
+        InsertVisibleTimeoutReturnMode(String option) {
+            this.option = option;
+        }
+
+        public String getOption() {
+            return option;
+        }
+
+        @Override
+        public String toString() {
+            return option;
+        }
+    }
 
     public static final String DELETE_WITHOUT_PARTITION = 
"delete_without_partition";
 
@@ -1060,6 +1084,15 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true)
     public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
 
+    // Control whether publish timeout keeps the committed response or returns 
an explicit error.
+    @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
needForward = true,
+            checker = "checkInsertVisibleTimeoutReturnMode", setter = 
"setInsertVisibleTimeoutReturnMode",
+            description = {"控制普通内表 INSERT 在 publish timeout 时返回给客户端的状态。",
+                    "Controls the status returned to the client when a normal 
internal-table INSERT times out "
+                            + "while waiting for publish visibility."},
+            options = {INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED, 
INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR})
+    public String insertVisibleTimeoutReturnMode = 
INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED;
+
     // max memory used on every backend. Default value to 100G.
     @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
     public long maxExecMemByte = 100147483648L;
@@ -4807,6 +4840,23 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public String getInsertVisibleTimeoutReturnMode() {
+        return getInsertVisibleTimeoutReturnModeEnum().getOption();
+    }
+
+    public InsertVisibleTimeoutReturnMode 
getInsertVisibleTimeoutReturnModeEnum() {
+        return 
parseInsertVisibleTimeoutReturnMode(insertVisibleTimeoutReturnMode);
+    }
+
+    public boolean isInsertVisibleTimeoutReturnError() {
+        return getInsertVisibleTimeoutReturnModeEnum() == 
InsertVisibleTimeoutReturnMode.ERROR;
+    }
+
+    public void setInsertVisibleTimeoutReturnMode(String 
insertVisibleTimeoutReturnMode) {
+        this.insertVisibleTimeoutReturnMode = 
parseInsertVisibleTimeoutReturnMode(insertVisibleTimeoutReturnMode)
+                .getOption();
+    }
+
     public boolean getIsSingleSetVar() {
         return isSingleSetVar;
     }
@@ -5195,6 +5245,27 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public void checkInsertVisibleTimeoutReturnMode(String mode) {
+        // Reuse the parser so validation stays consistent with assignment and 
enum access.
+        parseInsertVisibleTimeoutReturnMode(mode);
+    }
+
+    // Parse the stored string case-insensitively and expose the enum only to 
business logic.
+    private InsertVisibleTimeoutReturnMode 
parseInsertVisibleTimeoutReturnMode(String mode) {
+        if (StringUtils.isEmpty(mode)) {
+            LOG.warn("insertVisibleTimeoutReturnMode value is empty");
+            throw new 
UnsupportedOperationException("insertVisibleTimeoutReturnMode value is empty");
+        }
+        for (InsertVisibleTimeoutReturnMode value : 
InsertVisibleTimeoutReturnMode.values()) {
+            if (value.getOption().equalsIgnoreCase(mode)) {
+                return value;
+            }
+        }
+        LOG.warn("insertVisibleTimeoutReturnMode value is invalid, the invalid 
value is {}", mode);
+        throw new UnsupportedOperationException(
+                "insertVisibleTimeoutReturnMode value is invalid, the invalid 
value is " + mode);
+    }
+
     public void checkMaxExecutionTimeMSValid(String newValue) {
         int value = Integer.valueOf(newValue);
         if (value < 1000) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
new file mode 100644
index 00000000000..ea5318eedcb
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.profile.ExecutionProfile;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.HiveTransactionMgr;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.job.manager.StreamingTaskManager;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.InsertResult;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgrIface;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+/**
+ * Tests for publish-timeout behaviors in {@link OlapInsertExecutor}.
+ */
+class OlapInsertExecutorTest {
+
+    @AfterEach
+    void tearDown() {
+        ConnectContext.remove();
+    }
+
+    @Test
+    void 
testExecuteSingleInsertPublishTimeoutReturnErrorKeepsCommittedAccounting() 
throws Exception {
+        ConnectContext ctx = createExecutorContext();
+        ctx.getSessionVariable().setInsertVisibleTimeoutReturnMode(
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR);
+
+        Coordinator coordinator = createCoordinator();
+        GlobalTransactionMgrIface txnMgr = 
Mockito.mock(GlobalTransactionMgrIface.class);
+        TransactionState txnState = Mockito.mock(TransactionState.class);
+        LoadManager loadManager = Mockito.mock(LoadManager.class);
+        Env currentEnv = createCurrentEnv(loadManager);
+        StmtExecutor stmtExecutor = createStmtExecutor();
+
+        try (MockedStatic<EnvFactory> envFactoryMock = 
Mockito.mockStatic(EnvFactory.class);
+                MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+            prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr, 
txnState, currentEnv);
+            ctx.setEnv(currentEnv);
+
+            Mockito.when(txnMgr.commitAndPublishTransaction(
+                    Mockito.any(), Mockito.anyList(), Mockito.anyLong(), 
Mockito.anyList(), Mockito.anyLong(),
+                    Mockito.isNull())).thenReturn(false);
+
+            OlapInsertExecutor executor = createExecutor(ctx);
+            executor.txnId = 10001L;
+            executor.executeSingleInsert(stmtExecutor);
+
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
executor.txnStatus);
+            Assertions.assertEquals(MysqlStateType.ERR, 
ctx.getState().getStateType());
+            Assertions.assertTrue(ctx.getState().getErrorMessage().contains(
+                    "transaction commit successfully, BUT data did not become 
visible within "
+                            + "insert_visible_timeout_ms and will be visible 
later."));
+
+            InsertResult insertResult = ctx.getInsertResult();
+            Assertions.assertNotNull(insertResult);
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
insertResult.txnStatus);
+            Assertions.assertEquals(12L, insertResult.loadedRows);
+            Assertions.assertEquals(1L, insertResult.filteredRows);
+            Assertions.assertEquals(12L, ctx.getReturnRows());
+
+            
Mockito.verify(loadManager).recordFinishedLoadJob(Mockito.eq("label_test"), 
Mockito.eq(10001L),
+                    Mockito.eq("test_db"), Mockito.eq(2L), 
Mockito.eq(EtlJobType.INSERT), Mockito.anyLong(),
+                    Mockito.eq(""), Mockito.isNull(), Mockito.isNull(), 
Mockito.eq(UserIdentity.ROOT),
+                    Mockito.anyLong());
+            Mockito.verify(txnMgr, 
Mockito.never()).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
+                    Mockito.anyString());
+        }
+    }
+
+    @Test
+    void testPublishTimeoutCommittedModeReturnsOk() throws Exception {
+        ConnectContext ctx = createExecutorContext();
+        Coordinator coordinator = createCoordinator();
+        GlobalTransactionMgrIface txnMgr = 
Mockito.mock(GlobalTransactionMgrIface.class);
+        TransactionState txnState = Mockito.mock(TransactionState.class);
+        LoadManager loadManager = Mockito.mock(LoadManager.class);
+        Env currentEnv = createCurrentEnv(loadManager);
+        StmtExecutor stmtExecutor = createStmtExecutor();
+
+        try (MockedStatic<EnvFactory> envFactoryMock = 
Mockito.mockStatic(EnvFactory.class);
+                MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+            prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr, 
txnState, currentEnv);
+            ctx.setEnv(currentEnv);
+
+            Mockito.when(txnMgr.commitAndPublishTransaction(
+                    Mockito.any(), Mockito.anyList(), Mockito.anyLong(), 
Mockito.anyList(), Mockito.anyLong(),
+                    Mockito.isNull())).thenReturn(false);
+
+            OlapInsertExecutor executor = createExecutor(ctx);
+            executor.txnId = 10002L;
+            executor.executeSingleInsert(stmtExecutor);
+
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
executor.txnStatus);
+            Assertions.assertEquals(MysqlStateType.OK, 
ctx.getState().getStateType());
+            
Assertions.assertTrue(ctx.getState().getInfoMessage().contains("'status':'COMMITTED'"));
+
+            InsertResult insertResult = ctx.getInsertResult();
+            Assertions.assertNotNull(insertResult);
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
insertResult.txnStatus);
+            Assertions.assertEquals(12L, insertResult.loadedRows);
+            Assertions.assertEquals(1L, insertResult.filteredRows);
+            Assertions.assertEquals(12L, ctx.getReturnRows());
+
+            Mockito.verify(txnMgr, 
Mockito.never()).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
+                    Mockito.anyString());
+        }
+    }
+
+    @Test
+    void testOnFailAbortsUncommittedTransaction() throws Exception {
+        ConnectContext ctx = createExecutorContext();
+        Coordinator coordinator = createCoordinator();
+        GlobalTransactionMgrIface txnMgr = 
Mockito.mock(GlobalTransactionMgrIface.class);
+        TransactionState txnState = Mockito.mock(TransactionState.class);
+        LoadManager loadManager = Mockito.mock(LoadManager.class);
+        Env currentEnv = createCurrentEnv(loadManager);
+
+        try (MockedStatic<EnvFactory> envFactoryMock = 
Mockito.mockStatic(EnvFactory.class);
+                MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+            prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr, 
txnState, currentEnv);
+            ctx.setEnv(currentEnv);
+
+            // Simulate a pre-commit failure so the executor must abort the 
transaction.
+            OlapInsertExecutor executor = createExecutor(ctx);
+            executor.txnId = 10003L;
+            executor.txnStatus = TransactionStatus.ABORTED;
+
+            executor.onFail(new RuntimeException("pre-commit failure"));
+
+            Assertions.assertEquals(MysqlStateType.ERR, 
ctx.getState().getStateType());
+            
Assertions.assertTrue(ctx.getState().getErrorMessage().contains("pre-commit 
failure"));
+            Assertions.assertNull(ctx.getInsertResult());
+            Mockito.verify(txnMgr).abortTransaction(Mockito.eq(1L), 
Mockito.eq(10003L),
+                    Mockito.eq("pre-commit failure"));
+        }
+    }
+
+    // Build a fresh context per case so insertResult and QueryState do not 
leak between tests.
+    private ConnectContext createExecutorContext() {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setThreadLocalInfo();
+        ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+        ctx.setQueryId(new TUniqueId(1, 2));
+        // Disable strict insert mode because this test intentionally keeps 
one filtered row.
+        ctx.getSessionVariable().setEnableInsertStrict(false);
+        ctx.getState().reset();
+        ctx.resetReturnRows();
+        return ctx;
+    }
+
+    // Prepare the mocked coordinator so the executor can run its completion 
logic without real execution.
+    private Coordinator createCoordinator() throws Exception {
+        Coordinator coordinator = Mockito.mock(Coordinator.class);
+        // Return non-null query options because master registers the 
coordinator in QeProcessor.
+        TQueryOptions queryOptions = new TQueryOptions();
+        Mockito.when(coordinator.join(Mockito.anyInt())).thenReturn(true);
+        Mockito.when(coordinator.isDone()).thenReturn(true);
+        Mockito.when(coordinator.getExecStatus()).thenReturn(new 
Status(TStatusCode.OK, ""));
+        Mockito.when(coordinator.getQueryOptions()).thenReturn(queryOptions);
+        
Mockito.when(coordinator.getCommitInfos()).thenReturn(Lists.newArrayList());
+        Mockito.when(coordinator.getTrackingUrl()).thenReturn(null);
+        Mockito.when(coordinator.getFirstErrorMsg()).thenReturn(null);
+        
Mockito.when(coordinator.getExecutionProfile()).thenReturn(Mockito.mock(ExecutionProfile.class));
+        Mockito.when(coordinator.getLoadCounters()).thenReturn(ImmutableMap.of(
+                "dpp.norm.ALL", "12",
+                "dpp.abnorm.ALL", "1"));
+        return coordinator;
+    }
+
+    // Use a mocked executor so executeSingleInsert can run the real control 
flow without a full query setup.
+    private StmtExecutor createStmtExecutor() {
+        StmtExecutor stmtExecutor = Mockito.mock(StmtExecutor.class);
+        
Mockito.when(stmtExecutor.getProfile()).thenReturn(Mockito.mock(Profile.class));
+        
Mockito.when(stmtExecutor.getSummaryProfile()).thenReturn(Mockito.mock(SummaryProfile.class));
+        Mockito.when(stmtExecutor.getOriginStmtInString()).thenReturn("insert 
into test_tbl select 1");
+        Mockito.when(stmtExecutor.getParsedStmt()).thenReturn(null);
+        Mockito.when(stmtExecutor.isProfileSafeStmt()).thenReturn(false);
+        return stmtExecutor;
+    }
+
+    // Provide the job-manager chain needed by master-side setTxnCallbackId().
+    private Env createCurrentEnv(LoadManager loadManager) {
+        Env currentEnv = Mockito.mock(Env.class);
+        // Mock the internal catalog because ConnectContext.setEnv() resolves 
the default catalog on master.
+        InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+        JobManager<?, ?> jobManager = Mockito.mock(JobManager.class);
+        StreamingTaskManager streamingTaskManager = 
Mockito.mock(StreamingTaskManager.class);
+        
Mockito.when(currentEnv.getInternalCatalog()).thenReturn(internalCatalog);
+        Mockito.when(internalCatalog.getName()).thenReturn("internal");
+        Mockito.when(currentEnv.getLoadManager()).thenReturn(loadManager);
+        Mockito.when(currentEnv.getJobManager()).thenReturn(jobManager);
+        
Mockito.when(jobManager.getStreamingTaskManager()).thenReturn(streamingTaskManager);
+        
Mockito.when(streamingTaskManager.getStreamingInsertTaskById(Mockito.anyLong())).thenReturn(null);
+        return currentEnv;
+    }
+
+    // Create an executor with mocked table metadata because this test only 
validates timeout result handling.
+    private OlapInsertExecutor createExecutor(ConnectContext ctx) {
+        Database database = Mockito.mock(Database.class);
+        Mockito.when(database.getFullName()).thenReturn("test_db");
+        Mockito.when(database.getId()).thenReturn(1L);
+
+        // Mock OlapTable because the master-side executor now casts the 
target table to OlapTable.
+        OlapTable table = Mockito.mock(OlapTable.class);
+        Mockito.when(table.getDatabase()).thenReturn(database);
+        Mockito.when(table.getName()).thenReturn("test_tbl");
+        Mockito.when(table.getId()).thenReturn(2L);
+
+        return new OlapInsertExecutor(ctx, table, "label_test", 
Mockito.mock(NereidsPlanner.class),
+                Optional.empty(), false, 0L);
+    }
+
+    // Redirect coordinator creation and transaction access to mocks so the 
test stays deterministic.
+    private void prepareFactoryMocks(MockedStatic<EnvFactory> envFactoryMock, 
MockedStatic<Env> envMock,
+            Coordinator coordinator, GlobalTransactionMgrIface txnMgr, 
TransactionState txnState, Env currentEnv) {
+        EnvFactory envFactory = Mockito.mock(EnvFactory.class);
+        HiveTransactionMgr hiveTransactionMgr = 
Mockito.mock(HiveTransactionMgr.class);
+        envFactoryMock.when(EnvFactory::getInstance).thenReturn(envFactory);
+        Mockito.when(envFactory.createCoordinator(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.anyLong()))
+                .thenReturn(coordinator);
+
+        envMock.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+        
envMock.when(Env::getCurrentHiveTransactionMgr).thenReturn(hiveTransactionMgr);
+        envMock.when(Env::getCurrentEnv).thenReturn(currentEnv);
+        Mockito.when(txnMgr.getTransactionState(Mockito.anyLong(), 
Mockito.anyLong())).thenReturn(txnState);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index e8f7d7c0ed2..09d141279f4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.SetVar;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.utframe.TestWithFeService;
@@ -32,6 +33,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.Map;
 
 public class SessionVariablesTest extends TestWithFeService {
@@ -66,8 +68,13 @@ public class SessionVariablesTest extends TestWithFeService {
         Assertions.assertEquals(numOfForwardVars, vars.size());
 
         vars.put(SessionVariable.ENABLE_PROFILE, "true");
+        vars.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, "ERROR");
         sessionVariable.setForwardedSessionVariables(vars);
         Assertions.assertTrue(sessionVariable.enableProfile);
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                sessionVariable.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                sessionVariable.getInsertVisibleTimeoutReturnModeEnum());
     }
 
     @Test
@@ -82,6 +89,83 @@ public class SessionVariablesTest extends TestWithFeService {
                 
sessionVariableClone.getSessionOriginValue().get(txIsolationSessionVariableField));
     }
 
+    @Test
+    public void testInsertVisibleTimeoutReturnMode() throws Exception {
+        connectContext.setThreadLocalInfo();
+        SessionVariable sessionVar = connectContext.getSessionVariable();
+
+        VariableMgr.setVar(sessionVar, new SetVar(SetType.SESSION,
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, new 
StringLiteral("ERROR")));
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnModeEnum());
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                
sessionVar.getForwardVariables().get(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE));
+
+        SessionVariable restored = new SessionVariable();
+        
restored.readFromJson("{\"insert_visible_timeout_return_mode\":\"ERROR\"}");
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                restored.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                restored.getInsertVisibleTimeoutReturnModeEnum());
+
+        // Verify map restore keeps accepting canonical string tokens without 
extra normalization hooks.
+        Map<String, String> restoredMap = new HashMap<>();
+        restoredMap.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
"ERROR");
+        restored.readFromMap(restoredMap);
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                restored.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                restored.getInsertVisibleTimeoutReturnModeEnum());
+
+        Map<String, String> forwardVars = sessionVar.getForwardVariables();
+        forwardVars.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
"ERROR");
+        restored.setForwardedSessionVariables(forwardVars);
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                restored.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                restored.getInsertVisibleTimeoutReturnModeEnum());
+
+        Field field = 
SessionVariable.class.getDeclaredField("insertVisibleTimeoutReturnMode");
+        VariableMgr.VarAttr varAttr = 
field.getAnnotation(VariableMgr.VarAttr.class);
+        Assertions.assertArrayEquals(new String[] {
+                "控制普通内表 INSERT 在 publish timeout 时返回给客户端的状态。",
+                "Controls the status returned to the client when a normal 
internal-table INSERT times out "
+                        + "while waiting for publish visibility."
+        }, varAttr.description());
+        Assertions.assertArrayEquals(new String[] {
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR
+        }, varAttr.options());
+
+        ExceptionChecker.expectThrowsWithMsg(DdlException.class,
+                "insertVisibleTimeoutReturnMode value is invalid",
+                () -> VariableMgr.setVar(sessionVar, new 
SetVar(SetType.SESSION,
+                        SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
new StringLiteral("unexpected"))));
+    }
+
+    @Test
+    public void testInsertVisibleTimeoutReturnModeDefaultsAndCheckerBranches() 
{
+        // Cover the default branch and the helper methods used by 
setter/checker paths.
+        SessionVariable sessionVar = new SessionVariable();
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
+                sessionVar.getInsertVisibleTimeoutReturnMode());
+        Assertions.assertFalse(sessionVar.isInsertVisibleTimeoutReturnError());
+
+        // Verify setter normalization is case-insensitive and stores the 
canonical lowercase value.
+        sessionVar.setInsertVisibleTimeoutReturnMode("ErRoR");
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnModeEnum());
+        Assertions.assertTrue(sessionVar.isInsertVisibleTimeoutReturnError());
+
+        
ExceptionChecker.expectThrowsWithMsg(UnsupportedOperationException.class,
+                "insertVisibleTimeoutReturnMode value is empty",
+                () -> sessionVar.checkInsertVisibleTimeoutReturnMode(""));
+    }
+
     @Test
     public void testSetVarInHint() {
         String sql = "insert into test_t1 select /*+ 
set_var(enable_nereids_dml_with_pipeline=false)*/ * from test_t1 where 
enable_nereids_dml_with_pipeline=true";
diff --git 
a/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out 
b/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
new file mode 100644
index 00000000000..fa5640a23c0
--- /dev/null
+++ b/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !final_select --
+1      10
+2      20
+
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_preload_external_metadata_profile.groovy
 
b/regression-test/suites/external_table_p0/jdbc/test_preload_external_metadata_profile.groovy
deleted file mode 100644
index 5bcebb7da9e..00000000000
--- 
a/regression-test/suites/external_table_p0/jdbc/test_preload_external_metadata_profile.groovy
+++ /dev/null
@@ -1,195 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-import java.util.regex.Pattern
-
-suite("test_preload_external_metadata_profile", 
"p0,external,doris,external_docker,external_docker_doris") {
-    String dbName = "regression_test_preload_external_metadata_profile"
-    String catalogOff = "preload_external_metadata_profile_off"
-    String catalogOn = "preload_external_metadata_profile_on"
-    String internalTable = "preload_profile_internal"
-    String externalTablePrefix = "preload_profile_jdbc"
-    int externalTableCount = 10
-    int extraColumnCount = 60
-    int randomSuffix = Math.random() * 2000000000
-
-    String jdbcUrl = context.config.jdbcUrl
-    String jdbcUser = context.config.jdbcUser
-    String jdbcPassword = context.config.jdbcPassword
-    String s3Endpoint = getS3Endpoint()
-    String bucket = getS3BucketName()
-    String driverUrl = 
"https://${bucket}.${s3Endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
-    Map<String, Map<String, Integer>> profileMetrics = [:]
-
-    def parseTimeMs = { String timeText ->
-        if (timeText == "0") {
-            return 0
-        }
-        int totalMs = 0
-        def matcher = 
Pattern.compile("(\\d+)(hour|min|sec|ms)").matcher(timeText)
-        while (matcher.find()) {
-            int value = Integer.parseInt(matcher.group(1))
-            String unit = matcher.group(2)
-            if (unit == "hour") {
-                totalMs += value * 60 * 60 * 1000
-            } else if (unit == "min") {
-                totalMs += value * 60 * 1000
-            } else if (unit == "sec") {
-                totalMs += value * 1000
-            } else if (unit == "ms") {
-                totalMs += value
-            }
-        }
-        if (totalMs == 0) {
-            fail("Could not parse time counter: ${timeText}")
-        }
-        return totalMs
-    }
-
-    def extractCounterMs = { String profileString, String counterName ->
-        String flexibleCounterName = counterName.split(" ").collect { 
Pattern.quote(it) }.join("\\s+")
-        Pattern pattern = Pattern.compile(flexibleCounterName + 
"\\s*:\\s*([0-9]+(?:hour|min|sec|ms)?(?:[0-9]+(?:min|sec|ms))*)")
-        def matcher = pattern.matcher(profileString)
-        if (!matcher.find()) {
-            fail("Could not find profile counter: 
${counterName}\n${profileString}")
-        }
-        return parseTimeMs(matcher.group(1))
-    }
-
-    def buildJdbcCatalog = { String catalogName ->
-        sql """ DROP CATALOG IF EXISTS ${catalogName} """
-        sql """
-            CREATE CATALOG `${catalogName}` PROPERTIES (
-                "user" = "${jdbcUser}",
-                "type" = "jdbc",
-                "password" = "${jdbcPassword}",
-                "jdbc_url" = "${jdbcUrl}",
-                "driver_url" = "${driverUrl}",
-                "driver_class" = "com.mysql.cj.jdbc.Driver"
-            )
-        """
-    }
-
-    def buildQuery = { String catalogName ->
-        String selectColumns = (1..externalTableCount).collect { idx ->
-            "j${idx}.c1 AS j${idx}_c1, j${idx}.c${extraColumnCount} AS 
j${idx}_c${extraColumnCount}"
-        }.join(",\n")
-        String joins = (1..externalTableCount).collect { idx ->
-            "JOIN ${catalogName}.${dbName}.${externalTablePrefix}_${idx} 
j${idx} ON i.id = j${idx}.id"
-        }.join("\n")
-        return """
-            SELECT i.id,
-                   ${selectColumns}
-            FROM ${internalTable} i
-            ${joins}
-            ORDER BY i.id
-        """
-    }
-
-    def checkPreloadProfile = { String tag, String catalogName, boolean 
enablePreload, boolean expectPreload ->
-        sql """ SET enable_preload_external_metadata = ${enablePreload} """
-        profile(tag) {
-            run {
-                def result = sql """ /* ${tag} */ ${buildQuery(catalogName)} 
"""
-                assertEquals(2, result.size())
-            }
-
-            check { profileString, exception ->
-                if (exception != null) {
-                    throw exception
-                }
-                assertTrue(profileString.contains("PhysicalJdbcScan"))
-                int preloadMs = extractCounterMs(profileString, "Nereids 
Preload External Metadata Time")
-                int lockMs = extractCounterMs(profileString, "Nereids Lock 
Table Time")
-                int analysisMs = extractCounterMs(profileString, "Nereids 
Analysis Time")
-                profileMetrics[tag] = [
-                        preloadMs: preloadMs,
-                        lockMs: lockMs,
-                        analysisMs: analysisMs
-                ]
-                log.info("preload external metadata profile: tag={}, 
preloadMs={}, lockMs={}, analysisMs={}",
-                        tag, preloadMs, lockMs, analysisMs)
-                if (expectPreload) {
-                    assertTrue("preload metadata time should be recorded in 
preload counter", preloadMs > 0)
-                } else {
-                    assertEquals(0, preloadMs)
-                }
-                assertTrue("lock table counter should be non-negative", lockMs 
>= 0)
-            }
-        }
-    }
-
-    try {
-        sql """ SET enable_nereids_planner = true """
-        sql """ SET enable_fallback_to_original_planner = false """
-        sql """ DROP CATALOG IF EXISTS ${catalogOff} """
-        sql """ DROP CATALOG IF EXISTS ${catalogOn} """
-        sql """ DROP DATABASE IF EXISTS ${dbName} FORCE """
-        sql """ CREATE DATABASE ${dbName} """
-        sql """ USE ${dbName} """
-
-        sql """
-            CREATE TABLE ${internalTable} (
-                id INT,
-                v INT
-            )
-            DISTRIBUTED BY HASH(id) BUCKETS 1
-            PROPERTIES("replication_num" = "1")
-        """
-        sql """ INSERT INTO ${internalTable} VALUES (1, 10), (2, 20), (3, 30) 
"""
-
-        String extraColumns = (1..extraColumnCount).collect { idx -> 
"`c${idx}` VARCHAR(20)" }.join(",\n")
-        String extraValues = (1..extraColumnCount).collect { idx -> 
"'v${idx}'" }.join(", ")
-        for (int i = 1; i <= externalTableCount; i++) {
-            sql """
-                CREATE TABLE ${externalTablePrefix}_${i} (
-                    id INT,
-                    ${extraColumns}
-                )
-                DISTRIBUTED BY HASH(id) BUCKETS 1
-                PROPERTIES("replication_num" = "1")
-            """
-            sql """
-                INSERT INTO ${externalTablePrefix}_${i}
-                VALUES (1, ${extraValues}), (3, ${extraValues})
-            """
-        }
-
-        buildJdbcCatalog(catalogOff)
-        buildJdbcCatalog(catalogOn)
-
-        String offTag = "preload_external_metadata_profile_off_${randomSuffix}"
-        String onTag = "preload_external_metadata_profile_on_${randomSuffix}"
-        checkPreloadProfile(offTag, catalogOff, false, false)
-        checkPreloadProfile(onTag, catalogOn, true, true)
-        Map<String, Integer> offMetrics = profileMetrics[offTag]
-        Map<String, Integer> onMetrics = profileMetrics[onTag]
-        assertTrue(offMetrics != null)
-        assertTrue(onMetrics != null)
-        int analysisDropMs = offMetrics.analysisMs - onMetrics.analysisMs
-        int allowedNegativeJitterMs = Math.max(20, (int) (onMetrics.preloadMs 
* 0.1))
-        log.info("preload external metadata profile comparison: 
analysisDropMs={}, preloadMs={}, "
-                + "allowedNegativeJitterMs={}", analysisDropMs, 
onMetrics.preloadMs, allowedNegativeJitterMs)
-        assertTrue("analysis time should not increase meaningfully when 
metadata is preloaded",
-                analysisDropMs + allowedNegativeJitterMs >= 0)
-    } finally {
-        sql """ SET enable_preload_external_metadata = false """
-        sql """ DROP CATALOG IF EXISTS ${catalogOff} """
-        sql """ DROP CATALOG IF EXISTS ${catalogOn} """
-        sql """ DROP DATABASE IF EXISTS ${dbName} FORCE """
-    }
-}
diff --git 
a/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
 
b/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
new file mode 100644
index 00000000000..c203fbf412b
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite("test_insert_visible_timeout_return_mode", "nonConcurrent") {
+    if (isCloudMode()) {
+        return
+    }
+
+    def debugPoint = "PublishVersionDaemon.stop_publish"
+    // Use the configured FE HTTP endpoint so the case also works when SHOW 
FRONTENDS exposes loopback addresses.
+    def feHttpAddress = context.config.feHttpAddress
+    def feHost = feHttpAddress.split(":")[0]
+    def feHttpPort = Integer.parseInt(feHttpAddress.split(":")[1])
+
+    // Prepare a single-replica table so publish blocking deterministically 
drives the visible timeout path.
+    sql """ DROP TABLE IF EXISTS test_insert_visible_timeout_return_mode_tbl 
FORCE """
+    sql """
+        CREATE TABLE test_insert_visible_timeout_return_mode_tbl (
+            `k1` INT,
+            `k2` INT
+        )
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    try {
+        // Block FE publish so inserts can commit but remain non-visible until 
the debug point is removed.
+        DebugPoint.enableDebugPoint(feHost, feHttpPort, NodeType.FE, 
debugPoint)
+
+        sql """ SET insert_visible_timeout_ms = 1000 """
+
+        // Verify the default committed mode returns success after the visible 
wait times out.
+        sql """ SET insert_visible_timeout_return_mode = 'committed' """
+        sql """ INSERT INTO test_insert_visible_timeout_return_mode_tbl VALUES 
(1, 10) """
+
+        // Verify the error mode returns the publish-timeout error to the 
client while keeping the txn committed.
+        sql """ SET insert_visible_timeout_return_mode = 'error' """
+        test {
+            sql """ INSERT INTO test_insert_visible_timeout_return_mode_tbl 
VALUES (2, 20) """
+            exception "transaction commit successfully, BUT data did not 
become visible within insert_visible_timeout_ms and will be visible later."
+        }
+    } finally {
+        try {
+            DebugPoint.disableDebugPoint(feHost, feHttpPort, NodeType.FE, 
debugPoint)
+        } catch (Throwable e) {
+            logger.warn("Failed to disable debug point ${debugPoint}", e)
+        }
+    }
+
+    // Wait for FE publish to resume so both committed transactions become 
visible before checking final data.
+    def visible = false
+    for (int i = 0; i < 15; i++) {
+        def rowCount = sql """ SELECT COUNT(*) FROM 
test_insert_visible_timeout_return_mode_tbl """
+        if ((rowCount[0][0] as long) == 2L) {
+            visible = true
+            break
+        }
+        sleep(1000)
+    }
+    assertTrue(visible, "Rows should become visible after publish resumes")
+
+    order_qt_final_select """ SELECT * FROM 
test_insert_visible_timeout_return_mode_tbl ORDER BY k1 """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to